1use std::collections::{BTreeMap, HashSet};
16use std::fmt::{Debug, Display};
17use std::ops::Range;
18use std::sync::{Arc, Mutex};
19use std::time::{Duration, Instant};
20
21use api::v1::meta::mailbox_message::Payload;
22use api::v1::meta::{
23 HeartbeatRequest, HeartbeatResponse, MailboxMessage, RegionLease, ResponseHeader, Role,
24 PROTOCOL_VERSION,
25};
26use check_leader_handler::CheckLeaderHandler;
27use collect_cluster_info_handler::{
28 CollectDatanodeClusterInfoHandler, CollectFlownodeClusterInfoHandler,
29 CollectFrontendClusterInfoHandler,
30};
31use collect_leader_region_handler::CollectLeaderRegionHandler;
32use collect_stats_handler::CollectStatsHandler;
33use common_base::Plugins;
34use common_meta::datanode::Stat;
35use common_meta::instruction::{Instruction, InstructionReply};
36use common_meta::sequence::Sequence;
37use common_telemetry::{debug, info, warn};
38use dashmap::DashMap;
39use extract_stat_handler::ExtractStatHandler;
40use failure_handler::RegionFailureHandler;
41use filter_inactive_region_stats::FilterInactiveRegionStatsHandler;
42use futures::future::join_all;
43use keep_lease_handler::{DatanodeKeepLeaseHandler, FlownodeKeepLeaseHandler};
44use mailbox_handler::MailboxHandler;
45use on_leader_start_handler::OnLeaderStartHandler;
46use publish_heartbeat_handler::PublishHeartbeatHandler;
47use region_lease_handler::RegionLeaseHandler;
48use remap_flow_peer_handler::RemapFlowPeerHandler;
49use response_header_handler::ResponseHeaderHandler;
50use snafu::{OptionExt, ResultExt};
51use store_api::storage::RegionId;
52use tokio::sync::mpsc::Sender;
53use tokio::sync::{oneshot, watch, Notify, RwLock};
54
55use crate::error::{self, DeserializeFromJsonSnafu, Result, UnexpectedInstructionReplySnafu};
56use crate::handler::flow_state_handler::FlowStateHandler;
57use crate::metasrv::Context;
58use crate::metrics::{METRIC_META_HANDLER_EXECUTE, METRIC_META_HEARTBEAT_CONNECTION_NUM};
59use crate::pubsub::PublisherRef;
60use crate::service::mailbox::{
61 BroadcastChannel, Channel, Mailbox, MailboxReceiver, MailboxRef, MessageId,
62};
63
64pub mod check_leader_handler;
65pub mod collect_cluster_info_handler;
66pub mod collect_leader_region_handler;
67pub mod collect_stats_handler;
68pub mod extract_stat_handler;
69pub mod failure_handler;
70pub mod filter_inactive_region_stats;
71pub mod flow_state_handler;
72pub mod keep_lease_handler;
73pub mod mailbox_handler;
74pub mod on_leader_start_handler;
75pub mod publish_heartbeat_handler;
76pub mod region_lease_handler;
77pub mod remap_flow_peer_handler;
78pub mod response_header_handler;
79
80#[async_trait::async_trait]
81pub trait HeartbeatHandler: Send + Sync {
82 fn is_acceptable(&self, role: Role) -> bool;
83
84 fn name(&self) -> &'static str {
85 let type_name = std::any::type_name::<Self>();
86 type_name.split("::").last().unwrap_or(type_name)
88 }
89
90 async fn handle(
91 &self,
92 req: &HeartbeatRequest,
93 ctx: &mut Context,
94 acc: &mut HeartbeatAccumulator,
95 ) -> Result<HandleControl>;
96}
97
98#[derive(PartialEq, Debug)]
102pub enum HandleControl {
103 Continue,
104 Done,
105}
106
107#[derive(Debug, Default)]
108pub struct HeartbeatAccumulator {
109 pub header: Option<ResponseHeader>,
110 pub instructions: Vec<Instruction>,
111 pub stat: Option<Stat>,
112 pub inactive_region_ids: HashSet<RegionId>,
113 pub region_lease: Option<RegionLease>,
114}
115
116impl HeartbeatAccumulator {
117 pub fn into_mailbox_message(self) -> Option<MailboxMessage> {
118 None
120 }
121}
122
123#[derive(Copy, Clone)]
124pub struct PusherId {
125 pub role: Role,
126 pub id: u64,
127}
128
129impl Debug for PusherId {
130 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
131 write!(f, "{:?}-{}", self.role, self.id)
132 }
133}
134
135impl Display for PusherId {
136 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
137 write!(f, "{:?}-{}", self.role, self.id)
138 }
139}
140
141impl PusherId {
142 pub fn new(role: Role, id: u64) -> Self {
143 Self { role, id }
144 }
145
146 pub fn string_key(&self) -> String {
147 format!("{}-{}", self.role as i32, self.id)
148 }
149}
150
151pub type DeregisterSignalReceiver = watch::Receiver<bool>;
153
154pub struct Pusher {
156 sender: Sender<std::result::Result<HeartbeatResponse, tonic::Status>>,
157 deregister_signal_sender: watch::Sender<bool>,
161 deregister_signal_receiver: DeregisterSignalReceiver,
162
163 res_header: ResponseHeader,
164}
165
166impl Drop for Pusher {
167 fn drop(&mut self) {
168 let _ = self.deregister_signal_sender.send(true);
171 }
172}
173
174impl Pusher {
175 pub fn new(sender: Sender<std::result::Result<HeartbeatResponse, tonic::Status>>) -> Self {
176 let res_header = ResponseHeader {
177 protocol_version: PROTOCOL_VERSION,
178 ..Default::default()
179 };
180 let (deregister_signal_sender, deregister_signal_receiver) = watch::channel(false);
181 Self {
182 sender,
183 deregister_signal_sender,
184 deregister_signal_receiver,
185 res_header,
186 }
187 }
188
189 #[inline]
190 pub async fn push(&self, res: HeartbeatResponse) -> Result<()> {
191 self.sender.send(Ok(res)).await.map_err(|e| {
192 error::PushMessageSnafu {
193 err_msg: e.to_string(),
194 }
195 .build()
196 })
197 }
198
199 #[inline]
200 pub fn header(&self) -> ResponseHeader {
201 self.res_header.clone()
202 }
203}
204
205#[derive(Clone, Default)]
207pub struct Pushers(Arc<RwLock<BTreeMap<String, Pusher>>>);
208
209impl Pushers {
210 async fn push(
211 &self,
212 pusher_id: PusherId,
213 mailbox_message: MailboxMessage,
214 ) -> Result<DeregisterSignalReceiver> {
215 let pusher_id = pusher_id.string_key();
216 let pushers = self.0.read().await;
217 let pusher = pushers
218 .get(&pusher_id)
219 .context(error::PusherNotFoundSnafu { pusher_id })?;
220
221 pusher
222 .push(HeartbeatResponse {
223 header: Some(pusher.header()),
224 mailbox_message: Some(mailbox_message),
225 ..Default::default()
226 })
227 .await?;
228
229 Ok(pusher.deregister_signal_receiver.clone())
230 }
231
232 async fn broadcast(
233 &self,
234 range: Range<String>,
235 mailbox_message: &MailboxMessage,
236 ) -> Result<()> {
237 let pushers = self.0.read().await;
238 let pushers = pushers
239 .range(range)
240 .map(|(_, value)| value)
241 .collect::<Vec<_>>();
242 let mut results = Vec::with_capacity(pushers.len());
243
244 for pusher in pushers {
245 let mut mailbox_message = mailbox_message.clone();
246 mailbox_message.id = 0; results.push(pusher.push(HeartbeatResponse {
249 header: Some(pusher.header()),
250 mailbox_message: Some(mailbox_message),
251 ..Default::default()
252 }))
253 }
254
255 let _ = join_all(results)
257 .await
258 .into_iter()
259 .collect::<Result<Vec<_>>>()?;
260
261 Ok(())
262 }
263
264 pub(crate) async fn insert(&self, pusher_id: String, pusher: Pusher) -> Option<Pusher> {
265 self.0.write().await.insert(pusher_id, pusher)
266 }
267
268 async fn remove(&self, pusher_id: &str) -> Option<Pusher> {
269 self.0.write().await.remove(pusher_id)
270 }
271}
272
273#[derive(Clone)]
274pub struct NameCachedHandler {
275 name: &'static str,
276 handler: Arc<dyn HeartbeatHandler>,
277}
278
279impl NameCachedHandler {
280 fn new(handler: impl HeartbeatHandler + 'static) -> Self {
281 let name = handler.name();
282 let handler = Arc::new(handler);
283 Self { name, handler }
284 }
285}
286
287pub type HeartbeatHandlerGroupRef = Arc<HeartbeatHandlerGroup>;
288
289#[derive(Default, Clone)]
291pub struct HeartbeatHandlerGroup {
292 handlers: Vec<NameCachedHandler>,
293 pushers: Pushers,
294}
295
296impl HeartbeatHandlerGroup {
297 pub async fn register_pusher(&self, pusher_id: PusherId, pusher: Pusher) {
299 METRIC_META_HEARTBEAT_CONNECTION_NUM.inc();
300 info!("Pusher register: {}", pusher_id);
301 let _ = self.pushers.insert(pusher_id.string_key(), pusher).await;
302 }
303
304 pub async fn deregister_push(&self, pusher_id: PusherId) -> Option<Pusher> {
308 METRIC_META_HEARTBEAT_CONNECTION_NUM.dec();
309 info!("Pusher unregister: {}", pusher_id);
310 self.pushers.remove(&pusher_id.string_key()).await
311 }
312
313 pub fn pushers(&self) -> Pushers {
315 self.pushers.clone()
316 }
317
318 pub async fn handle(
320 &self,
321 req: HeartbeatRequest,
322 mut ctx: Context,
323 ) -> Result<HeartbeatResponse> {
324 let mut acc = HeartbeatAccumulator::default();
325 let role = req
326 .header
327 .as_ref()
328 .and_then(|h| Role::try_from(h.role).ok())
329 .context(error::InvalidArgumentsSnafu {
330 err_msg: format!("invalid role: {:?}", req.header),
331 })?;
332
333 for NameCachedHandler { name, handler } in self.handlers.iter() {
334 if !handler.is_acceptable(role) {
335 continue;
336 }
337
338 let _timer = METRIC_META_HANDLER_EXECUTE
339 .with_label_values(&[*name])
340 .start_timer();
341
342 if handler.handle(&req, &mut ctx, &mut acc).await? == HandleControl::Done {
343 break;
344 }
345 }
346 let header = std::mem::take(&mut acc.header);
347 let res = HeartbeatResponse {
348 header,
349 region_lease: acc.region_lease,
350 ..Default::default()
351 };
352 Ok(res)
353 }
354}
355
356pub struct HeartbeatMailbox {
357 pushers: Pushers,
358 sequence: Sequence,
359 senders: DashMap<MessageId, oneshot::Sender<Result<MailboxMessage>>>,
360 timeouts: DashMap<MessageId, Instant>,
361 timeout_notify: Notify,
362}
363
364impl HeartbeatMailbox {
365 pub fn json_reply(msg: &MailboxMessage) -> Result<InstructionReply> {
366 let Payload::Json(payload) =
367 msg.payload
368 .as_ref()
369 .with_context(|| UnexpectedInstructionReplySnafu {
370 mailbox_message: msg.to_string(),
371 reason: format!("empty payload, msg: {msg:?}"),
372 })?;
373 serde_json::from_str(payload).context(DeserializeFromJsonSnafu { input: payload })
374 }
375
376 #[cfg(test)]
378 pub(crate) fn json_instruction(msg: &MailboxMessage) -> Result<Instruction> {
379 let Payload::Json(payload) =
380 msg.payload
381 .as_ref()
382 .with_context(|| UnexpectedInstructionReplySnafu {
383 mailbox_message: msg.to_string(),
384 reason: format!("empty payload, msg: {msg:?}"),
385 })?;
386 serde_json::from_str(payload).context(DeserializeFromJsonSnafu { input: payload })
387 }
388
389 pub fn create(pushers: Pushers, sequence: Sequence) -> MailboxRef {
390 let mailbox = Arc::new(Self::new(pushers, sequence));
391
392 let timeout_checker = mailbox.clone();
393 let _handle = common_runtime::spawn_global(async move {
394 timeout_checker.check_timeout_bg(10).await;
395 });
396
397 mailbox
398 }
399
400 fn new(pushers: Pushers, sequence: Sequence) -> Self {
401 Self {
402 pushers,
403 sequence,
404 senders: DashMap::default(),
405 timeouts: DashMap::default(),
406 timeout_notify: Notify::new(),
407 }
408 }
409
410 async fn check_timeout_bg(&self, interval_millis: u64) {
411 let mut interval = tokio::time::interval(Duration::from_millis(interval_millis));
412
413 loop {
414 let _ = interval.tick().await;
415
416 if self.timeouts.is_empty() {
417 self.timeout_notify.notified().await;
418 }
419
420 let now = Instant::now();
421 let timeout_ids = self
422 .timeouts
423 .iter()
424 .filter_map(|entry| {
425 let (id, deadline) = entry.pair();
426 if deadline < &now {
427 Some(*id)
428 } else {
429 None
430 }
431 })
432 .collect::<Vec<_>>();
433
434 for id in timeout_ids {
435 let _ = self
436 .on_recv(id, Err(error::MailboxTimeoutSnafu { id }.build()))
437 .await;
438 }
439 }
440 }
441
442 #[inline]
443 async fn next_message_id(&self) -> Result<u64> {
444 loop {
447 let next = self
448 .sequence
449 .next()
450 .await
451 .context(error::NextSequenceSnafu)?;
452 if next > 0 {
453 return Ok(next);
454 }
455 }
456 }
457}
458
459#[async_trait::async_trait]
460impl Mailbox for HeartbeatMailbox {
461 async fn send(
462 &self,
463 ch: &Channel,
464 mut msg: MailboxMessage,
465 timeout: Duration,
466 ) -> Result<MailboxReceiver> {
467 let message_id = self.next_message_id().await?;
468 msg.id = message_id;
469
470 let pusher_id = ch.pusher_id();
471 debug!("Sending mailbox message {msg:?} to {pusher_id}");
472
473 let (tx, rx) = oneshot::channel();
474 let _ = self.senders.insert(message_id, tx);
475 let deadline = Instant::now() + timeout;
476 self.timeouts.insert(message_id, deadline);
477 self.timeout_notify.notify_one();
478
479 let deregister_signal_receiver = self.pushers.push(pusher_id, msg).await?;
480
481 Ok(MailboxReceiver::new(
482 message_id,
483 rx,
484 deregister_signal_receiver,
485 *ch,
486 ))
487 }
488
489 async fn send_oneway(&self, ch: &Channel, mut msg: MailboxMessage) -> Result<()> {
490 let message_id = 0; msg.id = message_id;
492
493 let pusher_id = ch.pusher_id();
494 debug!("Sending mailbox message {msg:?} to {pusher_id}");
495
496 self.pushers.push(pusher_id, msg).await?;
497
498 Ok(())
499 }
500
501 async fn broadcast(&self, ch: &BroadcastChannel, msg: &MailboxMessage) -> Result<()> {
502 self.pushers.broadcast(ch.pusher_range(), msg).await
503 }
504
505 async fn on_recv(&self, id: MessageId, maybe_msg: Result<MailboxMessage>) -> Result<()> {
506 debug!("Received mailbox message {maybe_msg:?}");
507
508 let _ = self.timeouts.remove(&id);
509
510 if let Some((_, tx)) = self.senders.remove(&id) {
511 tx.send(maybe_msg)
512 .map_err(|_| error::MailboxClosedSnafu { id }.build())?;
513 } else if let Ok(finally_msg) = maybe_msg {
514 warn!("The response arrived too late: {finally_msg:?}");
515 }
516
517 Ok(())
518 }
519}
520
521pub struct HeartbeatHandlerGroupBuilder {
523 region_failure_handler: Option<RegionFailureHandler>,
525
526 region_lease_handler: Option<RegionLeaseHandler>,
528
529 flush_stats_factor: Option<usize>,
533 flow_state_handler: Option<FlowStateHandler>,
535
536 plugins: Option<Plugins>,
538
539 pushers: Pushers,
541
542 handlers: Vec<NameCachedHandler>,
544}
545
546impl HeartbeatHandlerGroupBuilder {
547 pub fn new(pushers: Pushers) -> Self {
548 Self {
549 region_failure_handler: None,
550 region_lease_handler: None,
551 flush_stats_factor: None,
552 flow_state_handler: None,
553 plugins: None,
554 pushers,
555 handlers: vec![],
556 }
557 }
558
559 pub fn with_flow_state_handler(mut self, handler: Option<FlowStateHandler>) -> Self {
560 self.flow_state_handler = handler;
561 self
562 }
563
564 pub fn with_region_lease_handler(mut self, handler: Option<RegionLeaseHandler>) -> Self {
565 self.region_lease_handler = handler;
566 self
567 }
568
569 pub fn with_region_failure_handler(mut self, handler: Option<RegionFailureHandler>) -> Self {
571 self.region_failure_handler = handler;
572 self
573 }
574
575 pub fn with_flush_stats_factor(mut self, flush_stats_factor: Option<usize>) -> Self {
577 self.flush_stats_factor = flush_stats_factor;
578 self
579 }
580
581 pub fn with_plugins(mut self, plugins: Option<Plugins>) -> Self {
583 self.plugins = plugins;
584 self
585 }
586
587 pub fn add_default_handlers(mut self) -> Self {
589 let publish_heartbeat_handler = if let Some(plugins) = self.plugins.as_ref() {
591 plugins
592 .get::<PublisherRef>()
593 .map(|publish| PublishHeartbeatHandler::new(publish.clone()))
594 } else {
595 None
596 };
597
598 self.add_handler_last(ResponseHeaderHandler);
599 self.add_handler_last(DatanodeKeepLeaseHandler);
603 self.add_handler_last(FlownodeKeepLeaseHandler);
604 self.add_handler_last(CheckLeaderHandler);
605 self.add_handler_last(OnLeaderStartHandler);
606 self.add_handler_last(ExtractStatHandler);
607 self.add_handler_last(CollectDatanodeClusterInfoHandler);
608 self.add_handler_last(CollectFrontendClusterInfoHandler);
609 self.add_handler_last(CollectFlownodeClusterInfoHandler);
610 self.add_handler_last(MailboxHandler);
611 if let Some(region_lease_handler) = self.region_lease_handler.take() {
612 self.add_handler_last(region_lease_handler);
613 }
614 self.add_handler_last(FilterInactiveRegionStatsHandler);
615 if let Some(region_failure_handler) = self.region_failure_handler.take() {
616 self.add_handler_last(region_failure_handler);
617 }
618 if let Some(publish_heartbeat_handler) = publish_heartbeat_handler {
619 self.add_handler_last(publish_heartbeat_handler);
620 }
621 self.add_handler_last(CollectLeaderRegionHandler);
622 self.add_handler_last(CollectStatsHandler::new(self.flush_stats_factor));
623 self.add_handler_last(RemapFlowPeerHandler::default());
624
625 if let Some(flow_state_handler) = self.flow_state_handler.take() {
626 self.add_handler_last(flow_state_handler);
627 }
628
629 self
630 }
631
632 pub fn build(mut self) -> Result<HeartbeatHandlerGroup> {
636 if let Some(customizer) = self
637 .plugins
638 .as_ref()
639 .and_then(|plugins| plugins.get::<HeartbeatHandlerGroupBuilderCustomizerRef>())
640 {
641 debug!("Customizing the heartbeat handler group builder");
642 customizer.customize(&mut self)?;
643 }
644
645 Ok(HeartbeatHandlerGroup {
646 handlers: self.handlers,
647 pushers: self.pushers,
648 })
649 }
650
651 fn add_handler_after_inner(&mut self, target: &str, handler: NameCachedHandler) -> Result<()> {
652 if let Some(pos) = self.handlers.iter().position(|x| x.name == target) {
653 self.handlers.insert(pos + 1, handler);
654 return Ok(());
655 }
656
657 error::HandlerNotFoundSnafu { name: target }.fail()
658 }
659
660 pub fn add_handler_after(
662 &mut self,
663 target: &'static str,
664 handler: impl HeartbeatHandler + 'static,
665 ) -> Result<()> {
666 self.add_handler_after_inner(target, NameCachedHandler::new(handler))
667 }
668
669 fn add_handler_before_inner(&mut self, target: &str, handler: NameCachedHandler) -> Result<()> {
670 if let Some(pos) = self.handlers.iter().position(|x| x.name == target) {
671 self.handlers.insert(pos, handler);
672 return Ok(());
673 }
674
675 error::HandlerNotFoundSnafu { name: target }.fail()
676 }
677
678 pub fn add_handler_before(
680 &mut self,
681 target: &'static str,
682 handler: impl HeartbeatHandler + 'static,
683 ) -> Result<()> {
684 self.add_handler_before_inner(target, NameCachedHandler::new(handler))
685 }
686
687 fn replace_handler_inner(&mut self, target: &str, handler: NameCachedHandler) -> Result<()> {
688 if let Some(pos) = self.handlers.iter().position(|x| x.name == target) {
689 self.handlers[pos] = handler;
690 return Ok(());
691 }
692
693 error::HandlerNotFoundSnafu { name: target }.fail()
694 }
695
696 pub fn replace_handler(
698 &mut self,
699 target: &'static str,
700 handler: impl HeartbeatHandler + 'static,
701 ) -> Result<()> {
702 self.replace_handler_inner(target, NameCachedHandler::new(handler))
703 }
704
705 fn add_handler_last_inner(&mut self, handler: NameCachedHandler) {
706 self.handlers.push(handler);
707 }
708
709 fn add_handler_last(&mut self, handler: impl HeartbeatHandler + 'static) {
710 self.add_handler_last_inner(NameCachedHandler::new(handler));
711 }
712}
713
714pub type HeartbeatHandlerGroupBuilderCustomizerRef =
715 Arc<dyn HeartbeatHandlerGroupBuilderCustomizer>;
716
717pub enum CustomizeHeartbeatGroupAction {
718 AddHandlerAfter {
719 target: String,
720 handler: NameCachedHandler,
721 },
722 AddHandlerBefore {
723 target: String,
724 handler: NameCachedHandler,
725 },
726 ReplaceHandler {
727 target: String,
728 handler: NameCachedHandler,
729 },
730 AddHandlerLast {
731 handler: NameCachedHandler,
732 },
733}
734
735impl CustomizeHeartbeatGroupAction {
736 pub fn new_add_handler_after(
737 target: &'static str,
738 handler: impl HeartbeatHandler + 'static,
739 ) -> Self {
740 Self::AddHandlerAfter {
741 target: target.to_string(),
742 handler: NameCachedHandler::new(handler),
743 }
744 }
745
746 pub fn new_add_handler_before(
747 target: &'static str,
748 handler: impl HeartbeatHandler + 'static,
749 ) -> Self {
750 Self::AddHandlerBefore {
751 target: target.to_string(),
752 handler: NameCachedHandler::new(handler),
753 }
754 }
755
756 pub fn new_replace_handler(
757 target: &'static str,
758 handler: impl HeartbeatHandler + 'static,
759 ) -> Self {
760 Self::ReplaceHandler {
761 target: target.to_string(),
762 handler: NameCachedHandler::new(handler),
763 }
764 }
765
766 pub fn new_add_handler_last(handler: impl HeartbeatHandler + 'static) -> Self {
767 Self::AddHandlerLast {
768 handler: NameCachedHandler::new(handler),
769 }
770 }
771}
772
773pub trait HeartbeatHandlerGroupBuilderCustomizer: Send + Sync {
775 fn customize(&self, builder: &mut HeartbeatHandlerGroupBuilder) -> Result<()>;
776
777 fn add_action(&self, action: CustomizeHeartbeatGroupAction);
778}
779
780#[derive(Default)]
781pub struct DefaultHeartbeatHandlerGroupBuilderCustomizer {
782 actions: Mutex<Vec<CustomizeHeartbeatGroupAction>>,
783}
784
785impl HeartbeatHandlerGroupBuilderCustomizer for DefaultHeartbeatHandlerGroupBuilderCustomizer {
786 fn customize(&self, builder: &mut HeartbeatHandlerGroupBuilder) -> Result<()> {
787 info!("Customizing the heartbeat handler group builder");
788 let mut actions = self.actions.lock().unwrap();
789 for action in actions.drain(..) {
790 match action {
791 CustomizeHeartbeatGroupAction::AddHandlerAfter { target, handler } => {
792 builder.add_handler_after_inner(&target, handler)?;
793 }
794 CustomizeHeartbeatGroupAction::AddHandlerBefore { target, handler } => {
795 builder.add_handler_before_inner(&target, handler)?;
796 }
797 CustomizeHeartbeatGroupAction::ReplaceHandler { target, handler } => {
798 builder.replace_handler_inner(&target, handler)?;
799 }
800 CustomizeHeartbeatGroupAction::AddHandlerLast { handler } => {
801 builder.add_handler_last_inner(handler);
802 }
803 }
804 }
805 Ok(())
806 }
807
808 fn add_action(&self, action: CustomizeHeartbeatGroupAction) {
809 self.actions.lock().unwrap().push(action);
810 }
811}
812
813#[cfg(test)]
814mod tests {
815
816 use std::assert_matches::assert_matches;
817 use std::sync::Arc;
818 use std::time::Duration;
819
820 use api::v1::meta::{MailboxMessage, Role};
821 use common_meta::kv_backend::memory::MemoryKvBackend;
822 use common_meta::sequence::SequenceBuilder;
823 use tokio::sync::mpsc;
824
825 use super::{HeartbeatHandlerGroupBuilder, PusherId, Pushers};
826 use crate::error;
827 use crate::handler::collect_stats_handler::CollectStatsHandler;
828 use crate::handler::response_header_handler::ResponseHeaderHandler;
829 use crate::handler::{HeartbeatHandlerGroup, HeartbeatMailbox, Pusher};
830 use crate::service::mailbox::{Channel, MailboxReceiver, MailboxRef};
831
832 #[tokio::test]
833 async fn test_mailbox() {
834 let (mailbox, receiver) = push_msg_via_mailbox().await;
835 let id = receiver.message_id();
836
837 let resp_msg = MailboxMessage {
838 id,
839 subject: "resp-test".to_string(),
840 timestamp_millis: 456,
841 ..Default::default()
842 };
843
844 mailbox.on_recv(id, Ok(resp_msg)).await.unwrap();
845
846 let recv_msg = receiver.await.unwrap();
847 assert_eq!(recv_msg.id, id);
848 assert_eq!(recv_msg.timestamp_millis, 456);
849 assert_eq!(recv_msg.subject, "resp-test".to_string());
850 }
851
852 #[tokio::test]
853 async fn test_mailbox_timeout() {
854 let (_, receiver) = push_msg_via_mailbox().await;
855 let res = receiver.await;
856 assert!(res.is_err());
857 }
858
859 async fn push_msg_via_mailbox() -> (MailboxRef, MailboxReceiver) {
860 let datanode_id = 12;
861 let (pusher_tx, mut pusher_rx) = mpsc::channel(16);
862 let pusher_id = PusherId::new(Role::Datanode, datanode_id);
863 let pusher: Pusher = Pusher::new(pusher_tx);
864 let handler_group = HeartbeatHandlerGroup::default();
865 handler_group.register_pusher(pusher_id, pusher).await;
866
867 let kv_backend = Arc::new(MemoryKvBackend::new());
868 let seq = SequenceBuilder::new("test_seq", kv_backend).build();
869 let mailbox = HeartbeatMailbox::create(handler_group.pushers(), seq);
870
871 let msg = MailboxMessage {
872 id: 0,
873 subject: "req-test".to_string(),
874 timestamp_millis: 123,
875 ..Default::default()
876 };
877 let ch = Channel::Datanode(datanode_id);
878
879 let receiver = mailbox
880 .send(&ch, msg, Duration::from_secs(1))
881 .await
882 .unwrap();
883
884 let recv_obj = pusher_rx.recv().await.unwrap().unwrap();
885 let message = recv_obj.mailbox_message.unwrap();
886 assert_eq!(message.timestamp_millis, 123);
887 assert_eq!(message.subject, "req-test".to_string());
888
889 (mailbox, receiver)
890 }
891
892 #[test]
893 fn test_handler_group_builder() {
894 let group = HeartbeatHandlerGroupBuilder::new(Pushers::default())
895 .add_default_handlers()
896 .build()
897 .unwrap();
898
899 let handlers = group.handlers;
900 assert_eq!(14, handlers.len());
901
902 let names = [
903 "ResponseHeaderHandler",
904 "DatanodeKeepLeaseHandler",
905 "FlownodeKeepLeaseHandler",
906 "CheckLeaderHandler",
907 "OnLeaderStartHandler",
908 "ExtractStatHandler",
909 "CollectDatanodeClusterInfoHandler",
910 "CollectFrontendClusterInfoHandler",
911 "CollectFlownodeClusterInfoHandler",
912 "MailboxHandler",
913 "FilterInactiveRegionStatsHandler",
914 "CollectLeaderRegionHandler",
915 "CollectStatsHandler",
916 "RemapFlowPeerHandler",
917 ];
918
919 for (handler, name) in handlers.iter().zip(names.into_iter()) {
920 assert_eq!(handler.name, name);
921 }
922 }
923
924 #[test]
925 fn test_handler_group_builder_add_before() {
926 let mut builder =
927 HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers();
928 builder
929 .add_handler_before(
930 "FilterInactiveRegionStatsHandler",
931 CollectStatsHandler::default(),
932 )
933 .unwrap();
934
935 let group = builder.build().unwrap();
936 let handlers = group.handlers;
937 assert_eq!(15, handlers.len());
938
939 let names = [
940 "ResponseHeaderHandler",
941 "DatanodeKeepLeaseHandler",
942 "FlownodeKeepLeaseHandler",
943 "CheckLeaderHandler",
944 "OnLeaderStartHandler",
945 "ExtractStatHandler",
946 "CollectDatanodeClusterInfoHandler",
947 "CollectFrontendClusterInfoHandler",
948 "CollectFlownodeClusterInfoHandler",
949 "MailboxHandler",
950 "CollectStatsHandler",
951 "FilterInactiveRegionStatsHandler",
952 "CollectLeaderRegionHandler",
953 "CollectStatsHandler",
954 "RemapFlowPeerHandler",
955 ];
956
957 for (handler, name) in handlers.iter().zip(names.into_iter()) {
958 assert_eq!(handler.name, name);
959 }
960 }
961
962 #[test]
963 fn test_handler_group_builder_add_before_first() {
964 let mut builder =
965 HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers();
966 builder
967 .add_handler_before("ResponseHeaderHandler", CollectStatsHandler::default())
968 .unwrap();
969
970 let group = builder.build().unwrap();
971 let handlers = group.handlers;
972 assert_eq!(15, handlers.len());
973
974 let names = [
975 "CollectStatsHandler",
976 "ResponseHeaderHandler",
977 "DatanodeKeepLeaseHandler",
978 "FlownodeKeepLeaseHandler",
979 "CheckLeaderHandler",
980 "OnLeaderStartHandler",
981 "ExtractStatHandler",
982 "CollectDatanodeClusterInfoHandler",
983 "CollectFrontendClusterInfoHandler",
984 "CollectFlownodeClusterInfoHandler",
985 "MailboxHandler",
986 "FilterInactiveRegionStatsHandler",
987 "CollectLeaderRegionHandler",
988 "CollectStatsHandler",
989 "RemapFlowPeerHandler",
990 ];
991
992 for (handler, name) in handlers.iter().zip(names.into_iter()) {
993 assert_eq!(handler.name, name);
994 }
995 }
996
997 #[test]
998 fn test_handler_group_builder_add_after() {
999 let mut builder =
1000 HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers();
1001 builder
1002 .add_handler_after("MailboxHandler", CollectStatsHandler::default())
1003 .unwrap();
1004
1005 let group = builder.build().unwrap();
1006 let handlers = group.handlers;
1007 assert_eq!(15, handlers.len());
1008
1009 let names = [
1010 "ResponseHeaderHandler",
1011 "DatanodeKeepLeaseHandler",
1012 "FlownodeKeepLeaseHandler",
1013 "CheckLeaderHandler",
1014 "OnLeaderStartHandler",
1015 "ExtractStatHandler",
1016 "CollectDatanodeClusterInfoHandler",
1017 "CollectFrontendClusterInfoHandler",
1018 "CollectFlownodeClusterInfoHandler",
1019 "MailboxHandler",
1020 "CollectStatsHandler",
1021 "FilterInactiveRegionStatsHandler",
1022 "CollectLeaderRegionHandler",
1023 "CollectStatsHandler",
1024 "RemapFlowPeerHandler",
1025 ];
1026
1027 for (handler, name) in handlers.iter().zip(names.into_iter()) {
1028 assert_eq!(handler.name, name);
1029 }
1030 }
1031
1032 #[test]
1033 fn test_handler_group_builder_add_after_last() {
1034 let mut builder =
1035 HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers();
1036 builder
1037 .add_handler_after("CollectStatsHandler", ResponseHeaderHandler)
1038 .unwrap();
1039
1040 let group = builder.build().unwrap();
1041 let handlers = group.handlers;
1042 assert_eq!(15, handlers.len());
1043
1044 let names = [
1045 "ResponseHeaderHandler",
1046 "DatanodeKeepLeaseHandler",
1047 "FlownodeKeepLeaseHandler",
1048 "CheckLeaderHandler",
1049 "OnLeaderStartHandler",
1050 "ExtractStatHandler",
1051 "CollectDatanodeClusterInfoHandler",
1052 "CollectFrontendClusterInfoHandler",
1053 "CollectFlownodeClusterInfoHandler",
1054 "MailboxHandler",
1055 "FilterInactiveRegionStatsHandler",
1056 "CollectLeaderRegionHandler",
1057 "CollectStatsHandler",
1058 "ResponseHeaderHandler",
1059 "RemapFlowPeerHandler",
1060 ];
1061
1062 for (handler, name) in handlers.iter().zip(names.into_iter()) {
1063 assert_eq!(handler.name, name);
1064 }
1065 }
1066
1067 #[test]
1068 fn test_handler_group_builder_replace() {
1069 let mut builder =
1070 HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers();
1071 builder
1072 .replace_handler("MailboxHandler", CollectStatsHandler::default())
1073 .unwrap();
1074
1075 let group = builder.build().unwrap();
1076 let handlers = group.handlers;
1077 assert_eq!(14, handlers.len());
1078
1079 let names = [
1080 "ResponseHeaderHandler",
1081 "DatanodeKeepLeaseHandler",
1082 "FlownodeKeepLeaseHandler",
1083 "CheckLeaderHandler",
1084 "OnLeaderStartHandler",
1085 "ExtractStatHandler",
1086 "CollectDatanodeClusterInfoHandler",
1087 "CollectFrontendClusterInfoHandler",
1088 "CollectFlownodeClusterInfoHandler",
1089 "CollectStatsHandler",
1090 "FilterInactiveRegionStatsHandler",
1091 "CollectLeaderRegionHandler",
1092 "CollectStatsHandler",
1093 "RemapFlowPeerHandler",
1094 ];
1095
1096 for (handler, name) in handlers.iter().zip(names.into_iter()) {
1097 assert_eq!(handler.name, name);
1098 }
1099 }
1100
1101 #[test]
1102 fn test_handler_group_builder_replace_last() {
1103 let mut builder =
1104 HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers();
1105 builder
1106 .replace_handler("CollectStatsHandler", ResponseHeaderHandler)
1107 .unwrap();
1108
1109 let group = builder.build().unwrap();
1110 let handlers = group.handlers;
1111 assert_eq!(14, handlers.len());
1112
1113 let names = [
1114 "ResponseHeaderHandler",
1115 "DatanodeKeepLeaseHandler",
1116 "FlownodeKeepLeaseHandler",
1117 "CheckLeaderHandler",
1118 "OnLeaderStartHandler",
1119 "ExtractStatHandler",
1120 "CollectDatanodeClusterInfoHandler",
1121 "CollectFrontendClusterInfoHandler",
1122 "CollectFlownodeClusterInfoHandler",
1123 "MailboxHandler",
1124 "FilterInactiveRegionStatsHandler",
1125 "CollectLeaderRegionHandler",
1126 "ResponseHeaderHandler",
1127 "RemapFlowPeerHandler",
1128 ];
1129
1130 for (handler, name) in handlers.iter().zip(names.into_iter()) {
1131 assert_eq!(handler.name, name);
1132 }
1133 }
1134
1135 #[test]
1136 fn test_handler_group_builder_replace_first() {
1137 let mut builder =
1138 HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers();
1139 builder
1140 .replace_handler("ResponseHeaderHandler", CollectStatsHandler::default())
1141 .unwrap();
1142
1143 let group = builder.build().unwrap();
1144 let handlers = group.handlers;
1145 assert_eq!(14, handlers.len());
1146
1147 let names = [
1148 "CollectStatsHandler",
1149 "DatanodeKeepLeaseHandler",
1150 "FlownodeKeepLeaseHandler",
1151 "CheckLeaderHandler",
1152 "OnLeaderStartHandler",
1153 "ExtractStatHandler",
1154 "CollectDatanodeClusterInfoHandler",
1155 "CollectFrontendClusterInfoHandler",
1156 "CollectFlownodeClusterInfoHandler",
1157 "MailboxHandler",
1158 "FilterInactiveRegionStatsHandler",
1159 "CollectLeaderRegionHandler",
1160 "CollectStatsHandler",
1161 "RemapFlowPeerHandler",
1162 ];
1163
1164 for (handler, name) in handlers.iter().zip(names.into_iter()) {
1165 assert_eq!(handler.name, name);
1166 }
1167 }
1168
1169 #[test]
1170 fn test_handler_group_builder_handler_not_found() {
1171 let mut builder =
1172 HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers();
1173 let err = builder
1174 .add_handler_before("NotExists", CollectStatsHandler::default())
1175 .unwrap_err();
1176 assert_matches!(err, error::Error::HandlerNotFound { .. });
1177
1178 let err = builder
1179 .add_handler_after("NotExists", CollectStatsHandler::default())
1180 .unwrap_err();
1181 assert_matches!(err, error::Error::HandlerNotFound { .. });
1182
1183 let err = builder
1184 .replace_handler("NotExists", CollectStatsHandler::default())
1185 .unwrap_err();
1186 assert_matches!(err, error::Error::HandlerNotFound { .. });
1187 }
1188
1189 #[tokio::test]
1190 async fn test_pusher_drop() {
1191 let (tx, _rx) = mpsc::channel(1);
1192 let pusher = Pusher::new(tx);
1193 let mut deregister_signal_tx = pusher.deregister_signal_receiver.clone();
1194
1195 drop(pusher);
1196 deregister_signal_tx.changed().await.unwrap();
1197 }
1198}