1use std::collections::{BTreeMap, HashSet};
16use std::fmt::{Debug, Display};
17use std::ops::Range;
18use std::sync::{Arc, Mutex};
19use std::time::{Duration, Instant};
20
21use api::v1::meta::mailbox_message::Payload;
22use api::v1::meta::{
23 HeartbeatRequest, HeartbeatResponse, MailboxMessage, RegionLease, ResponseHeader, Role,
24 PROTOCOL_VERSION,
25};
26use check_leader_handler::CheckLeaderHandler;
27use collect_cluster_info_handler::{
28 CollectDatanodeClusterInfoHandler, CollectFlownodeClusterInfoHandler,
29 CollectFrontendClusterInfoHandler,
30};
31use collect_leader_region_handler::CollectLeaderRegionHandler;
32use collect_stats_handler::CollectStatsHandler;
33use common_base::Plugins;
34use common_meta::datanode::Stat;
35use common_meta::instruction::{Instruction, InstructionReply};
36use common_meta::sequence::Sequence;
37use common_telemetry::{debug, info, warn};
38use dashmap::DashMap;
39use extract_stat_handler::ExtractStatHandler;
40use failure_handler::RegionFailureHandler;
41use filter_inactive_region_stats::FilterInactiveRegionStatsHandler;
42use futures::future::join_all;
43use keep_lease_handler::{DatanodeKeepLeaseHandler, FlownodeKeepLeaseHandler};
44use mailbox_handler::MailboxHandler;
45use on_leader_start_handler::OnLeaderStartHandler;
46use publish_heartbeat_handler::PublishHeartbeatHandler;
47use region_lease_handler::RegionLeaseHandler;
48use remap_flow_peer_handler::RemapFlowPeerHandler;
49use response_header_handler::ResponseHeaderHandler;
50use snafu::{OptionExt, ResultExt};
51use store_api::storage::RegionId;
52use tokio::sync::mpsc::Sender;
53use tokio::sync::{oneshot, watch, Notify, RwLock};
54
55use crate::error::{self, DeserializeFromJsonSnafu, Result, UnexpectedInstructionReplySnafu};
56use crate::handler::collect_topic_stats_handler::CollectTopicStatsHandler;
57use crate::handler::flow_state_handler::FlowStateHandler;
58use crate::handler::persist_stats_handler::PersistStatsHandler;
59use crate::metasrv::Context;
60use crate::metrics::{METRIC_META_HANDLER_EXECUTE, METRIC_META_HEARTBEAT_CONNECTION_NUM};
61use crate::pubsub::PublisherRef;
62use crate::service::mailbox::{
63 BroadcastChannel, Channel, Mailbox, MailboxReceiver, MailboxRef, MessageId,
64};
65
66pub mod check_leader_handler;
67pub mod collect_cluster_info_handler;
68pub mod collect_leader_region_handler;
69pub mod collect_stats_handler;
70pub mod collect_topic_stats_handler;
71pub mod extract_stat_handler;
72pub mod failure_handler;
73pub mod filter_inactive_region_stats;
74pub mod flow_state_handler;
75pub mod keep_lease_handler;
76pub mod mailbox_handler;
77pub mod on_leader_start_handler;
78pub mod persist_stats_handler;
79pub mod publish_heartbeat_handler;
80pub mod region_lease_handler;
81pub mod remap_flow_peer_handler;
82pub mod response_header_handler;
83
84#[cfg(test)]
85pub mod test_utils;
86
87#[async_trait::async_trait]
88pub trait HeartbeatHandler: Send + Sync {
89 fn is_acceptable(&self, role: Role) -> bool;
90
91 fn name(&self) -> &'static str {
92 let type_name = std::any::type_name::<Self>();
93 type_name.split("::").last().unwrap_or(type_name)
95 }
96
97 async fn handle(
98 &self,
99 req: &HeartbeatRequest,
100 ctx: &mut Context,
101 acc: &mut HeartbeatAccumulator,
102 ) -> Result<HandleControl>;
103}
104
105#[derive(PartialEq, Debug)]
109pub enum HandleControl {
110 Continue,
111 Done,
112}
113
114#[derive(Debug, Default)]
115pub struct HeartbeatAccumulator {
116 pub header: Option<ResponseHeader>,
117 pub instructions: Vec<Instruction>,
118 pub stat: Option<Stat>,
119 pub inactive_region_ids: HashSet<RegionId>,
120 pub region_lease: Option<RegionLease>,
121}
122
123impl HeartbeatAccumulator {
124 pub fn into_mailbox_message(self) -> Option<MailboxMessage> {
125 None
127 }
128}
129
130#[derive(Copy, Clone)]
131pub struct PusherId {
132 pub role: Role,
133 pub id: u64,
134}
135
136impl Debug for PusherId {
137 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
138 write!(f, "{:?}-{}", self.role, self.id)
139 }
140}
141
142impl Display for PusherId {
143 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
144 write!(f, "{:?}-{}", self.role, self.id)
145 }
146}
147
148impl PusherId {
149 pub fn new(role: Role, id: u64) -> Self {
150 Self { role, id }
151 }
152
153 pub fn string_key(&self) -> String {
154 format!("{}-{}", self.role as i32, self.id)
155 }
156}
157
158pub type DeregisterSignalReceiver = watch::Receiver<bool>;
160
161pub struct Pusher {
163 sender: Sender<std::result::Result<HeartbeatResponse, tonic::Status>>,
164 deregister_signal_sender: watch::Sender<bool>,
168 deregister_signal_receiver: DeregisterSignalReceiver,
169
170 res_header: ResponseHeader,
171}
172
173impl Drop for Pusher {
174 fn drop(&mut self) {
175 let _ = self.deregister_signal_sender.send(true);
178 }
179}
180
181impl Pusher {
182 pub fn new(sender: Sender<std::result::Result<HeartbeatResponse, tonic::Status>>) -> Self {
183 let res_header = ResponseHeader {
184 protocol_version: PROTOCOL_VERSION,
185 ..Default::default()
186 };
187 let (deregister_signal_sender, deregister_signal_receiver) = watch::channel(false);
188 Self {
189 sender,
190 deregister_signal_sender,
191 deregister_signal_receiver,
192 res_header,
193 }
194 }
195
196 #[inline]
197 pub async fn push(&self, res: HeartbeatResponse) -> Result<()> {
198 self.sender.send(Ok(res)).await.map_err(|e| {
199 error::PushMessageSnafu {
200 err_msg: e.to_string(),
201 }
202 .build()
203 })
204 }
205
206 #[inline]
207 pub fn header(&self) -> ResponseHeader {
208 self.res_header.clone()
209 }
210}
211
212#[derive(Clone, Default)]
214pub struct Pushers(Arc<RwLock<BTreeMap<String, Pusher>>>);
215
216impl Pushers {
217 async fn push(
218 &self,
219 pusher_id: PusherId,
220 mailbox_message: MailboxMessage,
221 ) -> Result<DeregisterSignalReceiver> {
222 let pusher_id = pusher_id.string_key();
223 let pushers = self.0.read().await;
224 let pusher = pushers
225 .get(&pusher_id)
226 .context(error::PusherNotFoundSnafu { pusher_id })?;
227
228 pusher
229 .push(HeartbeatResponse {
230 header: Some(pusher.header()),
231 mailbox_message: Some(mailbox_message),
232 ..Default::default()
233 })
234 .await?;
235
236 Ok(pusher.deregister_signal_receiver.clone())
237 }
238
239 async fn broadcast(
240 &self,
241 range: Range<String>,
242 mailbox_message: &MailboxMessage,
243 ) -> Result<()> {
244 let pushers = self.0.read().await;
245 let pushers = pushers
246 .range(range)
247 .map(|(_, value)| value)
248 .collect::<Vec<_>>();
249 let mut results = Vec::with_capacity(pushers.len());
250
251 for pusher in pushers {
252 let mut mailbox_message = mailbox_message.clone();
253 mailbox_message.id = 0; results.push(pusher.push(HeartbeatResponse {
256 header: Some(pusher.header()),
257 mailbox_message: Some(mailbox_message),
258 ..Default::default()
259 }))
260 }
261
262 let _ = join_all(results)
264 .await
265 .into_iter()
266 .collect::<Result<Vec<_>>>()?;
267
268 Ok(())
269 }
270
271 pub(crate) async fn insert(&self, pusher_id: String, pusher: Pusher) -> Option<Pusher> {
272 self.0.write().await.insert(pusher_id, pusher)
273 }
274
275 async fn remove(&self, pusher_id: &str) -> Option<Pusher> {
276 self.0.write().await.remove(pusher_id)
277 }
278}
279
280#[derive(Clone)]
281pub struct NameCachedHandler {
282 name: &'static str,
283 handler: Arc<dyn HeartbeatHandler>,
284}
285
286impl NameCachedHandler {
287 fn new(handler: impl HeartbeatHandler + 'static) -> Self {
288 let name = handler.name();
289 let handler = Arc::new(handler);
290 Self { name, handler }
291 }
292}
293
294pub type HeartbeatHandlerGroupRef = Arc<HeartbeatHandlerGroup>;
295
296#[derive(Default, Clone)]
298pub struct HeartbeatHandlerGroup {
299 handlers: Vec<NameCachedHandler>,
300 pushers: Pushers,
301}
302
303impl HeartbeatHandlerGroup {
304 pub async fn register_pusher(&self, pusher_id: PusherId, pusher: Pusher) {
306 METRIC_META_HEARTBEAT_CONNECTION_NUM.inc();
307 info!("Pusher register: {}", pusher_id);
308 let _ = self.pushers.insert(pusher_id.string_key(), pusher).await;
309 }
310
311 pub async fn deregister_push(&self, pusher_id: PusherId) -> Option<Pusher> {
315 METRIC_META_HEARTBEAT_CONNECTION_NUM.dec();
316 info!("Pusher unregister: {}", pusher_id);
317 self.pushers.remove(&pusher_id.string_key()).await
318 }
319
320 pub fn pushers(&self) -> Pushers {
322 self.pushers.clone()
323 }
324
325 pub async fn handle(
327 &self,
328 req: HeartbeatRequest,
329 mut ctx: Context,
330 ) -> Result<HeartbeatResponse> {
331 let mut acc = HeartbeatAccumulator::default();
332 let role = req
333 .header
334 .as_ref()
335 .and_then(|h| Role::try_from(h.role).ok())
336 .context(error::InvalidArgumentsSnafu {
337 err_msg: format!("invalid role: {:?}", req.header),
338 })?;
339
340 for NameCachedHandler { name, handler } in self.handlers.iter() {
341 if !handler.is_acceptable(role) {
342 continue;
343 }
344
345 let _timer = METRIC_META_HANDLER_EXECUTE
346 .with_label_values(&[*name])
347 .start_timer();
348
349 if handler.handle(&req, &mut ctx, &mut acc).await? == HandleControl::Done {
350 break;
351 }
352 }
353 let header = std::mem::take(&mut acc.header);
354 let res = HeartbeatResponse {
355 header,
356 region_lease: acc.region_lease,
357 ..Default::default()
358 };
359 Ok(res)
360 }
361}
362
363pub struct HeartbeatMailbox {
364 pushers: Pushers,
365 sequence: Sequence,
366 senders: DashMap<MessageId, oneshot::Sender<Result<MailboxMessage>>>,
367 timeouts: DashMap<MessageId, Instant>,
368 timeout_notify: Notify,
369}
370
371impl HeartbeatMailbox {
372 pub fn json_reply(msg: &MailboxMessage) -> Result<InstructionReply> {
373 let Payload::Json(payload) =
374 msg.payload
375 .as_ref()
376 .with_context(|| UnexpectedInstructionReplySnafu {
377 mailbox_message: msg.to_string(),
378 reason: format!("empty payload, msg: {msg:?}"),
379 })?;
380 serde_json::from_str(payload).context(DeserializeFromJsonSnafu { input: payload })
381 }
382
383 #[cfg(test)]
385 pub fn json_instruction(msg: &MailboxMessage) -> Result<Instruction> {
386 let Payload::Json(payload) =
387 msg.payload
388 .as_ref()
389 .with_context(|| UnexpectedInstructionReplySnafu {
390 mailbox_message: msg.to_string(),
391 reason: format!("empty payload, msg: {msg:?}"),
392 })?;
393 serde_json::from_str(payload).context(DeserializeFromJsonSnafu { input: payload })
394 }
395
396 pub fn create(pushers: Pushers, sequence: Sequence) -> MailboxRef {
397 let mailbox = Arc::new(Self::new(pushers, sequence));
398
399 let timeout_checker = mailbox.clone();
400 let _handle = common_runtime::spawn_global(async move {
401 timeout_checker.check_timeout_bg(10).await;
402 });
403
404 mailbox
405 }
406
407 fn new(pushers: Pushers, sequence: Sequence) -> Self {
408 Self {
409 pushers,
410 sequence,
411 senders: DashMap::default(),
412 timeouts: DashMap::default(),
413 timeout_notify: Notify::new(),
414 }
415 }
416
417 async fn check_timeout_bg(&self, interval_millis: u64) {
418 let mut interval = tokio::time::interval(Duration::from_millis(interval_millis));
419
420 loop {
421 let _ = interval.tick().await;
422
423 if self.timeouts.is_empty() {
424 self.timeout_notify.notified().await;
425 }
426
427 let now = Instant::now();
428 let timeout_ids = self
429 .timeouts
430 .iter()
431 .filter_map(|entry| {
432 let (id, deadline) = entry.pair();
433 if deadline < &now {
434 Some(*id)
435 } else {
436 None
437 }
438 })
439 .collect::<Vec<_>>();
440
441 for id in timeout_ids {
442 let _ = self
443 .on_recv(id, Err(error::MailboxTimeoutSnafu { id }.build()))
444 .await;
445 }
446 }
447 }
448
449 #[inline]
450 async fn next_message_id(&self) -> Result<u64> {
451 loop {
454 let next = self
455 .sequence
456 .next()
457 .await
458 .context(error::NextSequenceSnafu)?;
459 if next > 0 {
460 return Ok(next);
461 }
462 }
463 }
464}
465
466#[async_trait::async_trait]
467impl Mailbox for HeartbeatMailbox {
468 async fn send(
469 &self,
470 ch: &Channel,
471 mut msg: MailboxMessage,
472 timeout: Duration,
473 ) -> Result<MailboxReceiver> {
474 let message_id = self.next_message_id().await?;
475 msg.id = message_id;
476
477 let pusher_id = ch.pusher_id();
478 debug!("Sending mailbox message {msg:?} to {pusher_id}");
479
480 let (tx, rx) = oneshot::channel();
481 let _ = self.senders.insert(message_id, tx);
482 let deadline = Instant::now() + timeout;
483 self.timeouts.insert(message_id, deadline);
484 self.timeout_notify.notify_one();
485
486 let deregister_signal_receiver = self.pushers.push(pusher_id, msg).await?;
487
488 Ok(MailboxReceiver::new(
489 message_id,
490 rx,
491 deregister_signal_receiver,
492 *ch,
493 ))
494 }
495
496 async fn send_oneway(&self, ch: &Channel, mut msg: MailboxMessage) -> Result<()> {
497 let message_id = 0; msg.id = message_id;
499
500 let pusher_id = ch.pusher_id();
501 debug!("Sending mailbox message {msg:?} to {pusher_id}");
502
503 self.pushers.push(pusher_id, msg).await?;
504
505 Ok(())
506 }
507
508 async fn broadcast(&self, ch: &BroadcastChannel, msg: &MailboxMessage) -> Result<()> {
509 self.pushers.broadcast(ch.pusher_range(), msg).await
510 }
511
512 async fn on_recv(&self, id: MessageId, maybe_msg: Result<MailboxMessage>) -> Result<()> {
513 debug!("Received mailbox message {maybe_msg:?}");
514
515 let _ = self.timeouts.remove(&id);
516
517 if let Some((_, tx)) = self.senders.remove(&id) {
518 tx.send(maybe_msg)
519 .map_err(|_| error::MailboxClosedSnafu { id }.build())?;
520 } else if let Ok(finally_msg) = maybe_msg {
521 warn!("The response arrived too late: {finally_msg:?}");
522 }
523
524 Ok(())
525 }
526}
527
528pub struct HeartbeatHandlerGroupBuilder {
530 region_failure_handler: Option<RegionFailureHandler>,
532
533 region_lease_handler: Option<RegionLeaseHandler>,
535
536 flush_stats_factor: Option<usize>,
540 flow_state_handler: Option<FlowStateHandler>,
542
543 persist_stats_handler: Option<PersistStatsHandler>,
545
546 plugins: Option<Plugins>,
548
549 pushers: Pushers,
551
552 handlers: Vec<NameCachedHandler>,
554}
555
556impl HeartbeatHandlerGroupBuilder {
557 pub fn new(pushers: Pushers) -> Self {
558 Self {
559 region_failure_handler: None,
560 region_lease_handler: None,
561 flush_stats_factor: None,
562 flow_state_handler: None,
563 persist_stats_handler: None,
564 plugins: None,
565 pushers,
566 handlers: vec![],
567 }
568 }
569
570 pub fn with_flow_state_handler(mut self, handler: Option<FlowStateHandler>) -> Self {
571 self.flow_state_handler = handler;
572 self
573 }
574
575 pub fn with_region_lease_handler(mut self, handler: Option<RegionLeaseHandler>) -> Self {
576 self.region_lease_handler = handler;
577 self
578 }
579
580 pub fn with_region_failure_handler(mut self, handler: Option<RegionFailureHandler>) -> Self {
582 self.region_failure_handler = handler;
583 self
584 }
585
586 pub fn with_flush_stats_factor(mut self, flush_stats_factor: Option<usize>) -> Self {
588 self.flush_stats_factor = flush_stats_factor;
589 self
590 }
591
592 pub fn with_persist_stats_handler(mut self, handler: Option<PersistStatsHandler>) -> Self {
593 self.persist_stats_handler = handler;
594 self
595 }
596
597 pub fn with_plugins(mut self, plugins: Option<Plugins>) -> Self {
599 self.plugins = plugins;
600 self
601 }
602
603 pub fn add_default_handlers(mut self) -> Self {
605 let publish_heartbeat_handler = if let Some(plugins) = self.plugins.as_ref() {
607 plugins
608 .get::<PublisherRef>()
609 .map(|publish| PublishHeartbeatHandler::new(publish.clone()))
610 } else {
611 None
612 };
613
614 self.add_handler_last(ResponseHeaderHandler);
615 self.add_handler_last(DatanodeKeepLeaseHandler);
619 self.add_handler_last(FlownodeKeepLeaseHandler);
620 self.add_handler_last(CheckLeaderHandler);
621 self.add_handler_last(OnLeaderStartHandler);
622 self.add_handler_last(ExtractStatHandler);
623 self.add_handler_last(CollectDatanodeClusterInfoHandler);
624 self.add_handler_last(CollectFrontendClusterInfoHandler);
625 self.add_handler_last(CollectFlownodeClusterInfoHandler);
626 self.add_handler_last(MailboxHandler);
627 if let Some(region_lease_handler) = self.region_lease_handler.take() {
628 self.add_handler_last(region_lease_handler);
629 }
630 self.add_handler_last(FilterInactiveRegionStatsHandler);
631 if let Some(region_failure_handler) = self.region_failure_handler.take() {
632 self.add_handler_last(region_failure_handler);
633 }
634 if let Some(publish_heartbeat_handler) = publish_heartbeat_handler {
635 self.add_handler_last(publish_heartbeat_handler);
636 }
637 self.add_handler_last(CollectLeaderRegionHandler);
638 self.add_handler_last(CollectTopicStatsHandler);
639 if let Some(persist_stats_handler) = self.persist_stats_handler.take() {
642 self.add_handler_last(persist_stats_handler);
643 }
644 self.add_handler_last(CollectStatsHandler::new(self.flush_stats_factor));
645 self.add_handler_last(RemapFlowPeerHandler::default());
646
647 if let Some(flow_state_handler) = self.flow_state_handler.take() {
648 self.add_handler_last(flow_state_handler);
649 }
650
651 self
652 }
653
654 pub fn build(mut self) -> Result<HeartbeatHandlerGroup> {
658 if let Some(customizer) = self
659 .plugins
660 .as_ref()
661 .and_then(|plugins| plugins.get::<HeartbeatHandlerGroupBuilderCustomizerRef>())
662 {
663 debug!("Customizing the heartbeat handler group builder");
664 customizer.customize(&mut self)?;
665 }
666
667 Ok(HeartbeatHandlerGroup {
668 handlers: self.handlers,
669 pushers: self.pushers,
670 })
671 }
672
673 fn add_handler_after_inner(&mut self, target: &str, handler: NameCachedHandler) -> Result<()> {
674 if let Some(pos) = self.handlers.iter().position(|x| x.name == target) {
675 self.handlers.insert(pos + 1, handler);
676 return Ok(());
677 }
678
679 error::HandlerNotFoundSnafu { name: target }.fail()
680 }
681
682 pub fn add_handler_after(
684 &mut self,
685 target: &'static str,
686 handler: impl HeartbeatHandler + 'static,
687 ) -> Result<()> {
688 self.add_handler_after_inner(target, NameCachedHandler::new(handler))
689 }
690
691 fn add_handler_before_inner(&mut self, target: &str, handler: NameCachedHandler) -> Result<()> {
692 if let Some(pos) = self.handlers.iter().position(|x| x.name == target) {
693 self.handlers.insert(pos, handler);
694 return Ok(());
695 }
696
697 error::HandlerNotFoundSnafu { name: target }.fail()
698 }
699
700 pub fn add_handler_before(
702 &mut self,
703 target: &'static str,
704 handler: impl HeartbeatHandler + 'static,
705 ) -> Result<()> {
706 self.add_handler_before_inner(target, NameCachedHandler::new(handler))
707 }
708
709 fn replace_handler_inner(&mut self, target: &str, handler: NameCachedHandler) -> Result<()> {
710 if let Some(pos) = self.handlers.iter().position(|x| x.name == target) {
711 self.handlers[pos] = handler;
712 return Ok(());
713 }
714
715 error::HandlerNotFoundSnafu { name: target }.fail()
716 }
717
718 pub fn replace_handler(
720 &mut self,
721 target: &'static str,
722 handler: impl HeartbeatHandler + 'static,
723 ) -> Result<()> {
724 self.replace_handler_inner(target, NameCachedHandler::new(handler))
725 }
726
727 fn add_handler_last_inner(&mut self, handler: NameCachedHandler) {
728 self.handlers.push(handler);
729 }
730
731 fn add_handler_last(&mut self, handler: impl HeartbeatHandler + 'static) {
732 self.add_handler_last_inner(NameCachedHandler::new(handler));
733 }
734}
735
736pub type HeartbeatHandlerGroupBuilderCustomizerRef =
737 Arc<dyn HeartbeatHandlerGroupBuilderCustomizer>;
738
739pub enum CustomizeHeartbeatGroupAction {
740 AddHandlerAfter {
741 target: String,
742 handler: NameCachedHandler,
743 },
744 AddHandlerBefore {
745 target: String,
746 handler: NameCachedHandler,
747 },
748 ReplaceHandler {
749 target: String,
750 handler: NameCachedHandler,
751 },
752 AddHandlerLast {
753 handler: NameCachedHandler,
754 },
755}
756
757impl CustomizeHeartbeatGroupAction {
758 pub fn new_add_handler_after(
759 target: &'static str,
760 handler: impl HeartbeatHandler + 'static,
761 ) -> Self {
762 Self::AddHandlerAfter {
763 target: target.to_string(),
764 handler: NameCachedHandler::new(handler),
765 }
766 }
767
768 pub fn new_add_handler_before(
769 target: &'static str,
770 handler: impl HeartbeatHandler + 'static,
771 ) -> Self {
772 Self::AddHandlerBefore {
773 target: target.to_string(),
774 handler: NameCachedHandler::new(handler),
775 }
776 }
777
778 pub fn new_replace_handler(
779 target: &'static str,
780 handler: impl HeartbeatHandler + 'static,
781 ) -> Self {
782 Self::ReplaceHandler {
783 target: target.to_string(),
784 handler: NameCachedHandler::new(handler),
785 }
786 }
787
788 pub fn new_add_handler_last(handler: impl HeartbeatHandler + 'static) -> Self {
789 Self::AddHandlerLast {
790 handler: NameCachedHandler::new(handler),
791 }
792 }
793}
794
795pub trait HeartbeatHandlerGroupBuilderCustomizer: Send + Sync {
797 fn customize(&self, builder: &mut HeartbeatHandlerGroupBuilder) -> Result<()>;
798
799 fn add_action(&self, action: CustomizeHeartbeatGroupAction);
800}
801
802#[derive(Default)]
803pub struct DefaultHeartbeatHandlerGroupBuilderCustomizer {
804 actions: Mutex<Vec<CustomizeHeartbeatGroupAction>>,
805}
806
807impl HeartbeatHandlerGroupBuilderCustomizer for DefaultHeartbeatHandlerGroupBuilderCustomizer {
808 fn customize(&self, builder: &mut HeartbeatHandlerGroupBuilder) -> Result<()> {
809 info!("Customizing the heartbeat handler group builder");
810 let mut actions = self.actions.lock().unwrap();
811 for action in actions.drain(..) {
812 match action {
813 CustomizeHeartbeatGroupAction::AddHandlerAfter { target, handler } => {
814 builder.add_handler_after_inner(&target, handler)?;
815 }
816 CustomizeHeartbeatGroupAction::AddHandlerBefore { target, handler } => {
817 builder.add_handler_before_inner(&target, handler)?;
818 }
819 CustomizeHeartbeatGroupAction::ReplaceHandler { target, handler } => {
820 builder.replace_handler_inner(&target, handler)?;
821 }
822 CustomizeHeartbeatGroupAction::AddHandlerLast { handler } => {
823 builder.add_handler_last_inner(handler);
824 }
825 }
826 }
827 Ok(())
828 }
829
830 fn add_action(&self, action: CustomizeHeartbeatGroupAction) {
831 self.actions.lock().unwrap().push(action);
832 }
833}
834
835#[cfg(test)]
836mod tests {
837
838 use std::assert_matches::assert_matches;
839 use std::sync::Arc;
840 use std::time::Duration;
841
842 use api::v1::meta::{MailboxMessage, Role};
843 use common_meta::kv_backend::memory::MemoryKvBackend;
844 use common_meta::sequence::SequenceBuilder;
845 use tokio::sync::mpsc;
846
847 use super::{HeartbeatHandlerGroupBuilder, PusherId, Pushers};
848 use crate::error;
849 use crate::handler::collect_stats_handler::CollectStatsHandler;
850 use crate::handler::response_header_handler::ResponseHeaderHandler;
851 use crate::handler::{HeartbeatHandlerGroup, HeartbeatMailbox, Pusher};
852 use crate::service::mailbox::{Channel, MailboxReceiver, MailboxRef};
853
854 #[tokio::test]
855 async fn test_mailbox() {
856 let (mailbox, receiver) = push_msg_via_mailbox().await;
857 let id = receiver.message_id();
858
859 let resp_msg = MailboxMessage {
860 id,
861 subject: "resp-test".to_string(),
862 timestamp_millis: 456,
863 ..Default::default()
864 };
865
866 mailbox.on_recv(id, Ok(resp_msg)).await.unwrap();
867
868 let recv_msg = receiver.await.unwrap();
869 assert_eq!(recv_msg.id, id);
870 assert_eq!(recv_msg.timestamp_millis, 456);
871 assert_eq!(recv_msg.subject, "resp-test".to_string());
872 }
873
874 #[tokio::test]
875 async fn test_mailbox_timeout() {
876 let (_, receiver) = push_msg_via_mailbox().await;
877 let res = receiver.await;
878 assert!(res.is_err());
879 }
880
881 async fn push_msg_via_mailbox() -> (MailboxRef, MailboxReceiver) {
882 let datanode_id = 12;
883 let (pusher_tx, mut pusher_rx) = mpsc::channel(16);
884 let pusher_id = PusherId::new(Role::Datanode, datanode_id);
885 let pusher: Pusher = Pusher::new(pusher_tx);
886 let handler_group = HeartbeatHandlerGroup::default();
887 handler_group.register_pusher(pusher_id, pusher).await;
888
889 let kv_backend = Arc::new(MemoryKvBackend::new());
890 let seq = SequenceBuilder::new("test_seq", kv_backend).build();
891 let mailbox = HeartbeatMailbox::create(handler_group.pushers(), seq);
892
893 let msg = MailboxMessage {
894 id: 0,
895 subject: "req-test".to_string(),
896 timestamp_millis: 123,
897 ..Default::default()
898 };
899 let ch = Channel::Datanode(datanode_id);
900
901 let receiver = mailbox
902 .send(&ch, msg, Duration::from_secs(1))
903 .await
904 .unwrap();
905
906 let recv_obj = pusher_rx.recv().await.unwrap().unwrap();
907 let message = recv_obj.mailbox_message.unwrap();
908 assert_eq!(message.timestamp_millis, 123);
909 assert_eq!(message.subject, "req-test".to_string());
910
911 (mailbox, receiver)
912 }
913
914 #[test]
915 fn test_handler_group_builder() {
916 let group = HeartbeatHandlerGroupBuilder::new(Pushers::default())
917 .add_default_handlers()
918 .build()
919 .unwrap();
920
921 let handlers = group.handlers;
922 let names = [
923 "ResponseHeaderHandler",
924 "DatanodeKeepLeaseHandler",
925 "FlownodeKeepLeaseHandler",
926 "CheckLeaderHandler",
927 "OnLeaderStartHandler",
928 "ExtractStatHandler",
929 "CollectDatanodeClusterInfoHandler",
930 "CollectFrontendClusterInfoHandler",
931 "CollectFlownodeClusterInfoHandler",
932 "MailboxHandler",
933 "FilterInactiveRegionStatsHandler",
934 "CollectLeaderRegionHandler",
935 "CollectTopicStatsHandler",
936 "CollectStatsHandler",
937 "RemapFlowPeerHandler",
938 ];
939 assert_eq!(names.len(), handlers.len());
940 for (handler, name) in handlers.iter().zip(names.into_iter()) {
941 assert_eq!(handler.name, name);
942 }
943 }
944
945 #[test]
946 fn test_handler_group_builder_add_before() {
947 let mut builder =
948 HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers();
949 builder
950 .add_handler_before(
951 "FilterInactiveRegionStatsHandler",
952 CollectStatsHandler::default(),
953 )
954 .unwrap();
955
956 let group = builder.build().unwrap();
957 let handlers = group.handlers;
958 let names = [
959 "ResponseHeaderHandler",
960 "DatanodeKeepLeaseHandler",
961 "FlownodeKeepLeaseHandler",
962 "CheckLeaderHandler",
963 "OnLeaderStartHandler",
964 "ExtractStatHandler",
965 "CollectDatanodeClusterInfoHandler",
966 "CollectFrontendClusterInfoHandler",
967 "CollectFlownodeClusterInfoHandler",
968 "MailboxHandler",
969 "CollectStatsHandler",
970 "FilterInactiveRegionStatsHandler",
971 "CollectLeaderRegionHandler",
972 "CollectTopicStatsHandler",
973 "CollectStatsHandler",
974 "RemapFlowPeerHandler",
975 ];
976 assert_eq!(names.len(), handlers.len());
977 for (handler, name) in handlers.iter().zip(names.into_iter()) {
978 assert_eq!(handler.name, name);
979 }
980 }
981
982 #[test]
983 fn test_handler_group_builder_add_before_first() {
984 let mut builder =
985 HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers();
986 builder
987 .add_handler_before("ResponseHeaderHandler", CollectStatsHandler::default())
988 .unwrap();
989
990 let group = builder.build().unwrap();
991 let handlers = group.handlers;
992 let names = [
993 "CollectStatsHandler",
994 "ResponseHeaderHandler",
995 "DatanodeKeepLeaseHandler",
996 "FlownodeKeepLeaseHandler",
997 "CheckLeaderHandler",
998 "OnLeaderStartHandler",
999 "ExtractStatHandler",
1000 "CollectDatanodeClusterInfoHandler",
1001 "CollectFrontendClusterInfoHandler",
1002 "CollectFlownodeClusterInfoHandler",
1003 "MailboxHandler",
1004 "FilterInactiveRegionStatsHandler",
1005 "CollectLeaderRegionHandler",
1006 "CollectTopicStatsHandler",
1007 "CollectStatsHandler",
1008 "RemapFlowPeerHandler",
1009 ];
1010 assert_eq!(names.len(), handlers.len());
1011 for (handler, name) in handlers.iter().zip(names.into_iter()) {
1012 assert_eq!(handler.name, name);
1013 }
1014 }
1015
1016 #[test]
1017 fn test_handler_group_builder_add_after() {
1018 let mut builder =
1019 HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers();
1020 builder
1021 .add_handler_after("MailboxHandler", CollectStatsHandler::default())
1022 .unwrap();
1023
1024 let group = builder.build().unwrap();
1025 let handlers = group.handlers;
1026 let names = [
1027 "ResponseHeaderHandler",
1028 "DatanodeKeepLeaseHandler",
1029 "FlownodeKeepLeaseHandler",
1030 "CheckLeaderHandler",
1031 "OnLeaderStartHandler",
1032 "ExtractStatHandler",
1033 "CollectDatanodeClusterInfoHandler",
1034 "CollectFrontendClusterInfoHandler",
1035 "CollectFlownodeClusterInfoHandler",
1036 "MailboxHandler",
1037 "CollectStatsHandler",
1038 "FilterInactiveRegionStatsHandler",
1039 "CollectLeaderRegionHandler",
1040 "CollectTopicStatsHandler",
1041 "CollectStatsHandler",
1042 "RemapFlowPeerHandler",
1043 ];
1044 assert_eq!(names.len(), handlers.len());
1045 for (handler, name) in handlers.iter().zip(names.into_iter()) {
1046 assert_eq!(handler.name, name);
1047 }
1048 }
1049
1050 #[test]
1051 fn test_handler_group_builder_add_after_last() {
1052 let mut builder =
1053 HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers();
1054 builder
1055 .add_handler_after("CollectStatsHandler", ResponseHeaderHandler)
1056 .unwrap();
1057
1058 let group = builder.build().unwrap();
1059 let handlers = group.handlers;
1060 let names = [
1061 "ResponseHeaderHandler",
1062 "DatanodeKeepLeaseHandler",
1063 "FlownodeKeepLeaseHandler",
1064 "CheckLeaderHandler",
1065 "OnLeaderStartHandler",
1066 "ExtractStatHandler",
1067 "CollectDatanodeClusterInfoHandler",
1068 "CollectFrontendClusterInfoHandler",
1069 "CollectFlownodeClusterInfoHandler",
1070 "MailboxHandler",
1071 "FilterInactiveRegionStatsHandler",
1072 "CollectLeaderRegionHandler",
1073 "CollectTopicStatsHandler",
1074 "CollectStatsHandler",
1075 "ResponseHeaderHandler",
1076 "RemapFlowPeerHandler",
1077 ];
1078 assert_eq!(names.len(), handlers.len());
1079 for (handler, name) in handlers.iter().zip(names.into_iter()) {
1080 assert_eq!(handler.name, name);
1081 }
1082 }
1083
1084 #[test]
1085 fn test_handler_group_builder_replace() {
1086 let mut builder =
1087 HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers();
1088 builder
1089 .replace_handler("MailboxHandler", CollectStatsHandler::default())
1090 .unwrap();
1091
1092 let group = builder.build().unwrap();
1093 let handlers = group.handlers;
1094 let names = [
1095 "ResponseHeaderHandler",
1096 "DatanodeKeepLeaseHandler",
1097 "FlownodeKeepLeaseHandler",
1098 "CheckLeaderHandler",
1099 "OnLeaderStartHandler",
1100 "ExtractStatHandler",
1101 "CollectDatanodeClusterInfoHandler",
1102 "CollectFrontendClusterInfoHandler",
1103 "CollectFlownodeClusterInfoHandler",
1104 "CollectStatsHandler",
1105 "FilterInactiveRegionStatsHandler",
1106 "CollectLeaderRegionHandler",
1107 "CollectTopicStatsHandler",
1108 "CollectStatsHandler",
1109 "RemapFlowPeerHandler",
1110 ];
1111
1112 assert_eq!(names.len(), handlers.len());
1113 for (handler, name) in handlers.iter().zip(names.into_iter()) {
1114 assert_eq!(handler.name, name);
1115 }
1116 }
1117
1118 #[test]
1119 fn test_handler_group_builder_replace_last() {
1120 let mut builder =
1121 HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers();
1122 builder
1123 .replace_handler("CollectStatsHandler", ResponseHeaderHandler)
1124 .unwrap();
1125
1126 let group = builder.build().unwrap();
1127 let handlers = group.handlers;
1128 let names = [
1129 "ResponseHeaderHandler",
1130 "DatanodeKeepLeaseHandler",
1131 "FlownodeKeepLeaseHandler",
1132 "CheckLeaderHandler",
1133 "OnLeaderStartHandler",
1134 "ExtractStatHandler",
1135 "CollectDatanodeClusterInfoHandler",
1136 "CollectFrontendClusterInfoHandler",
1137 "CollectFlownodeClusterInfoHandler",
1138 "MailboxHandler",
1139 "FilterInactiveRegionStatsHandler",
1140 "CollectLeaderRegionHandler",
1141 "CollectTopicStatsHandler",
1142 "ResponseHeaderHandler",
1143 "RemapFlowPeerHandler",
1144 ];
1145
1146 assert_eq!(names.len(), handlers.len());
1147 for (handler, name) in handlers.iter().zip(names.into_iter()) {
1148 assert_eq!(handler.name, name);
1149 }
1150 }
1151
1152 #[test]
1153 fn test_handler_group_builder_replace_first() {
1154 let mut builder =
1155 HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers();
1156 builder
1157 .replace_handler("ResponseHeaderHandler", CollectStatsHandler::default())
1158 .unwrap();
1159
1160 let group = builder.build().unwrap();
1161 let handlers = group.handlers;
1162 let names = [
1163 "CollectStatsHandler",
1164 "DatanodeKeepLeaseHandler",
1165 "FlownodeKeepLeaseHandler",
1166 "CheckLeaderHandler",
1167 "OnLeaderStartHandler",
1168 "ExtractStatHandler",
1169 "CollectDatanodeClusterInfoHandler",
1170 "CollectFrontendClusterInfoHandler",
1171 "CollectFlownodeClusterInfoHandler",
1172 "MailboxHandler",
1173 "FilterInactiveRegionStatsHandler",
1174 "CollectLeaderRegionHandler",
1175 "CollectTopicStatsHandler",
1176 "CollectStatsHandler",
1177 "RemapFlowPeerHandler",
1178 ];
1179 assert_eq!(names.len(), handlers.len());
1180 for (handler, name) in handlers.iter().zip(names.into_iter()) {
1181 assert_eq!(handler.name, name);
1182 }
1183 }
1184
1185 #[test]
1186 fn test_handler_group_builder_handler_not_found() {
1187 let mut builder =
1188 HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers();
1189 let err = builder
1190 .add_handler_before("NotExists", CollectStatsHandler::default())
1191 .unwrap_err();
1192 assert_matches!(err, error::Error::HandlerNotFound { .. });
1193
1194 let err = builder
1195 .add_handler_after("NotExists", CollectStatsHandler::default())
1196 .unwrap_err();
1197 assert_matches!(err, error::Error::HandlerNotFound { .. });
1198
1199 let err = builder
1200 .replace_handler("NotExists", CollectStatsHandler::default())
1201 .unwrap_err();
1202 assert_matches!(err, error::Error::HandlerNotFound { .. });
1203 }
1204
1205 #[tokio::test]
1206 async fn test_pusher_drop() {
1207 let (tx, _rx) = mpsc::channel(1);
1208 let pusher = Pusher::new(tx);
1209 let mut deregister_signal_tx = pusher.deregister_signal_receiver.clone();
1210
1211 drop(pusher);
1212 deregister_signal_tx.changed().await.unwrap();
1213 }
1214}