meta_srv/
handler.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashSet};
16use std::fmt::{Debug, Display};
17use std::ops::Range;
18use std::sync::{Arc, Mutex};
19use std::time::{Duration, Instant};
20
21use api::v1::meta::mailbox_message::Payload;
22use api::v1::meta::{
23    HeartbeatRequest, HeartbeatResponse, MailboxMessage, PROTOCOL_VERSION, RegionLease,
24    ResponseHeader, Role,
25};
26use check_leader_handler::CheckLeaderHandler;
27use collect_cluster_info_handler::{
28    CollectDatanodeClusterInfoHandler, CollectFlownodeClusterInfoHandler,
29    CollectFrontendClusterInfoHandler,
30};
31use collect_leader_region_handler::CollectLeaderRegionHandler;
32use collect_stats_handler::CollectStatsHandler;
33use common_base::Plugins;
34use common_meta::datanode::Stat;
35use common_meta::instruction::InstructionReply;
36use common_meta::sequence::Sequence;
37use common_telemetry::{debug, info, warn};
38use dashmap::DashMap;
39use extract_stat_handler::ExtractStatHandler;
40use failure_handler::RegionFailureHandler;
41use filter_inactive_region_stats::FilterInactiveRegionStatsHandler;
42use futures::future::join_all;
43use keep_lease_handler::{DatanodeKeepLeaseHandler, FlownodeKeepLeaseHandler};
44use mailbox_handler::MailboxHandler;
45use on_leader_start_handler::OnLeaderStartHandler;
46use publish_heartbeat_handler::PublishHeartbeatHandler;
47use region_lease_handler::RegionLeaseHandler;
48use remap_flow_peer_handler::RemapFlowPeerHandler;
49use response_header_handler::ResponseHeaderHandler;
50use snafu::{OptionExt, ResultExt};
51use store_api::storage::RegionId;
52use tokio::sync::mpsc::Sender;
53use tokio::sync::{Notify, RwLock, oneshot, watch};
54
55use crate::error::{self, DeserializeFromJsonSnafu, Result, UnexpectedInstructionReplySnafu};
56use crate::handler::collect_topic_stats_handler::CollectTopicStatsHandler;
57use crate::handler::flow_state_handler::FlowStateHandler;
58use crate::handler::persist_stats_handler::PersistStatsHandler;
59use crate::metasrv::Context;
60use crate::metrics::{METRIC_META_HANDLER_EXECUTE, METRIC_META_HEARTBEAT_CONNECTION_NUM};
61use crate::pubsub::PublisherRef;
62use crate::service::mailbox::{
63    BroadcastChannel, Channel, Mailbox, MailboxReceiver, MailboxRef, MessageId,
64};
65
66pub mod check_leader_handler;
67pub mod collect_cluster_info_handler;
68pub mod collect_leader_region_handler;
69pub mod collect_stats_handler;
70pub mod collect_topic_stats_handler;
71pub mod extract_stat_handler;
72pub mod failure_handler;
73pub mod filter_inactive_region_stats;
74pub mod flow_state_handler;
75pub mod keep_lease_handler;
76pub mod mailbox_handler;
77pub mod on_leader_start_handler;
78pub mod persist_stats_handler;
79pub mod publish_heartbeat_handler;
80pub mod region_lease_handler;
81pub mod remap_flow_peer_handler;
82pub mod response_header_handler;
83
84#[cfg(test)]
85pub mod test_utils;
86
87#[async_trait::async_trait]
88pub trait HeartbeatHandler: Send + Sync {
89    fn is_acceptable(&self, role: Role) -> bool;
90
91    fn name(&self) -> &'static str {
92        let type_name = std::any::type_name::<Self>();
93        // short name
94        type_name.split("::").last().unwrap_or(type_name)
95    }
96
97    async fn handle(
98        &self,
99        req: &HeartbeatRequest,
100        ctx: &mut Context,
101        acc: &mut HeartbeatAccumulator,
102    ) -> Result<HandleControl>;
103}
104
105/// HandleControl
106///
107/// Controls process of handling heartbeat request.
108#[derive(PartialEq, Debug)]
109pub enum HandleControl {
110    Continue,
111    Done,
112}
113
114#[derive(Debug, Default)]
115pub struct HeartbeatAccumulator {
116    pub header: Option<ResponseHeader>,
117    mailbox_message: Option<MailboxMessage>,
118    pub stat: Option<Stat>,
119    pub inactive_region_ids: HashSet<RegionId>,
120    pub region_lease: Option<RegionLease>,
121}
122
123impl HeartbeatAccumulator {
124    pub(crate) fn take_mailbox_message(&mut self) -> Option<MailboxMessage> {
125        self.mailbox_message.take()
126    }
127
128    pub fn set_mailbox_message(&mut self, message: MailboxMessage) {
129        let _ = self.mailbox_message.insert(message);
130    }
131}
132
133#[derive(Copy, Clone)]
134pub struct PusherId {
135    pub role: Role,
136    pub id: u64,
137}
138
139impl Debug for PusherId {
140    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
141        write!(f, "{:?}-{}", self.role, self.id)
142    }
143}
144
145impl Display for PusherId {
146    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
147        write!(f, "{:?}-{}", self.role, self.id)
148    }
149}
150
151impl PusherId {
152    pub fn new(role: Role, id: u64) -> Self {
153        Self { role, id }
154    }
155
156    pub fn string_key(&self) -> String {
157        format!("{}-{}", self.role as i32, self.id)
158    }
159}
160
161/// The receiver of the deregister signal.
162pub type DeregisterSignalReceiver = watch::Receiver<bool>;
163
164/// The pusher of the heartbeat response.
165pub struct Pusher {
166    sender: Sender<std::result::Result<HeartbeatResponse, tonic::Status>>,
167    // The sender of the deregister signal.
168    // default is false, means the pusher is not deregistered.
169    // when the pusher is deregistered, the sender will be notified.
170    deregister_signal_sender: watch::Sender<bool>,
171    deregister_signal_receiver: DeregisterSignalReceiver,
172
173    res_header: ResponseHeader,
174}
175
176impl Drop for Pusher {
177    fn drop(&mut self) {
178        // Ignore the error here.
179        // if all the receivers have been dropped, means no body cares the deregister signal.
180        let _ = self.deregister_signal_sender.send(true);
181    }
182}
183
184impl Pusher {
185    pub fn new(sender: Sender<std::result::Result<HeartbeatResponse, tonic::Status>>) -> Self {
186        let res_header = ResponseHeader {
187            protocol_version: PROTOCOL_VERSION,
188            ..Default::default()
189        };
190        let (deregister_signal_sender, deregister_signal_receiver) = watch::channel(false);
191        Self {
192            sender,
193            deregister_signal_sender,
194            deregister_signal_receiver,
195            res_header,
196        }
197    }
198
199    #[inline]
200    pub async fn push(&self, res: HeartbeatResponse) -> Result<()> {
201        self.sender.send(Ok(res)).await.map_err(|e| {
202            error::PushMessageSnafu {
203                err_msg: e.to_string(),
204            }
205            .build()
206        })
207    }
208
209    #[inline]
210    pub fn header(&self) -> ResponseHeader {
211        self.res_header.clone()
212    }
213}
214
215/// The group of heartbeat pushers.
216#[derive(Clone, Default)]
217pub struct Pushers(Arc<RwLock<BTreeMap<String, Pusher>>>);
218
219impl Pushers {
220    async fn push(
221        &self,
222        pusher_id: PusherId,
223        mailbox_message: MailboxMessage,
224    ) -> Result<DeregisterSignalReceiver> {
225        let pusher_id = pusher_id.string_key();
226        let pushers = self.0.read().await;
227        let pusher = pushers
228            .get(&pusher_id)
229            .context(error::PusherNotFoundSnafu { pusher_id })?;
230
231        pusher
232            .push(HeartbeatResponse {
233                header: Some(pusher.header()),
234                mailbox_message: Some(mailbox_message),
235                ..Default::default()
236            })
237            .await?;
238
239        Ok(pusher.deregister_signal_receiver.clone())
240    }
241
242    async fn broadcast(
243        &self,
244        range: Range<String>,
245        mailbox_message: &MailboxMessage,
246    ) -> Result<()> {
247        let pushers = self.0.read().await;
248        let pushers = pushers
249            .range(range)
250            .map(|(_, value)| value)
251            .collect::<Vec<_>>();
252        let mut results = Vec::with_capacity(pushers.len());
253
254        for pusher in pushers {
255            let mut mailbox_message = mailbox_message.clone();
256            mailbox_message.id = 0; // one-way message
257
258            results.push(pusher.push(HeartbeatResponse {
259                header: Some(pusher.header()),
260                mailbox_message: Some(mailbox_message),
261                ..Default::default()
262            }))
263        }
264
265        // Checks the error out of the loop.
266        let _ = join_all(results)
267            .await
268            .into_iter()
269            .collect::<Result<Vec<_>>>()?;
270
271        Ok(())
272    }
273
274    pub(crate) async fn insert(&self, pusher_id: String, pusher: Pusher) -> Option<Pusher> {
275        self.0.write().await.insert(pusher_id, pusher)
276    }
277
278    async fn remove(&self, pusher_id: &str) -> Option<Pusher> {
279        self.0.write().await.remove(pusher_id)
280    }
281
282    pub(crate) async fn clear(&self) -> Vec<String> {
283        let mut pushers = self.0.write().await;
284        let keys = pushers.keys().cloned().collect::<Vec<_>>();
285        if !keys.is_empty() {
286            pushers.clear();
287        }
288        keys
289    }
290}
291
292#[derive(Clone)]
293pub struct NameCachedHandler {
294    name: &'static str,
295    handler: Arc<dyn HeartbeatHandler>,
296}
297
298impl NameCachedHandler {
299    fn new(handler: impl HeartbeatHandler + 'static) -> Self {
300        let name = handler.name();
301        let handler = Arc::new(handler);
302        Self { name, handler }
303    }
304}
305
306pub type HeartbeatHandlerGroupRef = Arc<HeartbeatHandlerGroup>;
307
308/// The group of heartbeat handlers.
309#[derive(Default, Clone)]
310pub struct HeartbeatHandlerGroup {
311    handlers: Vec<NameCachedHandler>,
312    pushers: Pushers,
313}
314
315impl HeartbeatHandlerGroup {
316    /// Registers the heartbeat response [`Pusher`] with the given key to the group.
317    pub async fn register_pusher(&self, pusher_id: PusherId, pusher: Pusher) {
318        METRIC_META_HEARTBEAT_CONNECTION_NUM.inc();
319        info!("Pusher register: {}", pusher_id);
320        let _ = self.pushers.insert(pusher_id.string_key(), pusher).await;
321    }
322
323    /// Deregisters the heartbeat response [`Pusher`] with the given key from the group.
324    pub async fn deregister_push(&self, pusher_id: PusherId) {
325        info!("Pusher unregister: {}", pusher_id);
326        if self.pushers.remove(&pusher_id.string_key()).await.is_some() {
327            METRIC_META_HEARTBEAT_CONNECTION_NUM.dec();
328        }
329    }
330
331    /// Returns the [`Pushers`] of the group.
332    pub fn pushers(&self) -> Pushers {
333        self.pushers.clone()
334    }
335
336    /// Handles the heartbeat request.
337    pub async fn handle(
338        &self,
339        req: HeartbeatRequest,
340        mut ctx: Context,
341    ) -> Result<HeartbeatResponse> {
342        let mut acc = HeartbeatAccumulator::default();
343        let role = req
344            .header
345            .as_ref()
346            .and_then(|h| Role::try_from(h.role).ok())
347            .context(error::InvalidArgumentsSnafu {
348                err_msg: format!("invalid role: {:?}", req.header),
349            })?;
350
351        let is_handshake = ctx.is_handshake;
352
353        for NameCachedHandler { name, handler } in self.handlers.iter() {
354            if !handler.is_acceptable(role) {
355                continue;
356            }
357
358            let _timer = METRIC_META_HANDLER_EXECUTE
359                .with_label_values(&[*name])
360                .start_timer();
361
362            if handler.handle(&req, &mut ctx, &mut acc).await? == HandleControl::Done {
363                break;
364            }
365        }
366        let header = std::mem::take(&mut acc.header);
367        let mailbox_message = acc.take_mailbox_message();
368
369        // Populate heartbeat_config during handshake
370        let heartbeat_config = if is_handshake {
371            let config = ctx.heartbeat_options_for(role).into();
372
373            info!(
374                "Handshake with {:?} node, sending config: {:?}",
375                role, config
376            );
377
378            Some(config)
379        } else {
380            None
381        };
382
383        let res = HeartbeatResponse {
384            header,
385            region_lease: acc.region_lease,
386            mailbox_message,
387            heartbeat_config,
388        };
389        Ok(res)
390    }
391}
392
393pub struct HeartbeatMailbox {
394    pushers: Pushers,
395    sequence: Sequence,
396    senders: DashMap<MessageId, oneshot::Sender<Result<MailboxMessage>>>,
397    timeouts: DashMap<MessageId, Instant>,
398    timeout_notify: Notify,
399}
400
401impl HeartbeatMailbox {
402    pub fn json_reply(msg: &MailboxMessage) -> Result<InstructionReply> {
403        let Payload::Json(payload) =
404            msg.payload
405                .as_ref()
406                .with_context(|| UnexpectedInstructionReplySnafu {
407                    mailbox_message: msg.to_string(),
408                    reason: format!("empty payload, msg: {msg:?}"),
409                })?;
410        serde_json::from_str(payload).context(DeserializeFromJsonSnafu { input: payload })
411    }
412
413    /// Parses the [Instruction] from [MailboxMessage].
414    #[cfg(test)]
415    pub(crate) fn json_instruction(
416        msg: &MailboxMessage,
417    ) -> Result<common_meta::instruction::Instruction> {
418        let Payload::Json(payload) =
419            msg.payload
420                .as_ref()
421                .with_context(|| UnexpectedInstructionReplySnafu {
422                    mailbox_message: msg.to_string(),
423                    reason: format!("empty payload, msg: {msg:?}"),
424                })?;
425        serde_json::from_str(payload).context(DeserializeFromJsonSnafu { input: payload })
426    }
427
428    pub fn create(pushers: Pushers, sequence: Sequence) -> MailboxRef {
429        let mailbox = Arc::new(Self::new(pushers, sequence));
430
431        let timeout_checker = mailbox.clone();
432        let _handle = common_runtime::spawn_global(async move {
433            timeout_checker.check_timeout_bg(10).await;
434        });
435
436        mailbox
437    }
438
439    fn new(pushers: Pushers, sequence: Sequence) -> Self {
440        Self {
441            pushers,
442            sequence,
443            senders: DashMap::default(),
444            timeouts: DashMap::default(),
445            timeout_notify: Notify::new(),
446        }
447    }
448
449    async fn check_timeout_bg(&self, interval_millis: u64) {
450        let mut interval = tokio::time::interval(Duration::from_millis(interval_millis));
451
452        loop {
453            let _ = interval.tick().await;
454
455            if self.timeouts.is_empty() {
456                self.timeout_notify.notified().await;
457            }
458
459            let now = Instant::now();
460            let timeout_ids = self
461                .timeouts
462                .iter()
463                .filter_map(|entry| {
464                    let (id, deadline) = entry.pair();
465                    if deadline < &now { Some(*id) } else { None }
466                })
467                .collect::<Vec<_>>();
468
469            for id in timeout_ids {
470                let _ = self
471                    .on_recv(id, Err(error::MailboxTimeoutSnafu { id }.build()))
472                    .await;
473            }
474        }
475    }
476
477    #[inline]
478    async fn next_message_id(&self) -> Result<u64> {
479        // In this implementation, we pre-occupy the message_id of 0,
480        // and we use `message_id = 0` to mark a Message as a one-way call.
481        loop {
482            let next = self
483                .sequence
484                .next()
485                .await
486                .context(error::NextSequenceSnafu)?;
487            if next > 0 {
488                return Ok(next);
489            }
490        }
491    }
492}
493
494#[async_trait::async_trait]
495impl Mailbox for HeartbeatMailbox {
496    async fn send(
497        &self,
498        ch: &Channel,
499        mut msg: MailboxMessage,
500        timeout: Duration,
501    ) -> Result<MailboxReceiver> {
502        let message_id = self.next_message_id().await?;
503        msg.id = message_id;
504
505        let pusher_id = ch.pusher_id();
506        debug!("Sending mailbox message {msg:?} to {pusher_id}");
507
508        let (tx, rx) = oneshot::channel();
509        let _ = self.senders.insert(message_id, tx);
510        let deadline = Instant::now() + timeout;
511        self.timeouts.insert(message_id, deadline);
512        self.timeout_notify.notify_one();
513
514        let deregister_signal_receiver = self.pushers.push(pusher_id, msg).await?;
515
516        Ok(MailboxReceiver::new(
517            message_id,
518            rx,
519            deregister_signal_receiver,
520            *ch,
521        ))
522    }
523
524    async fn send_oneway(&self, ch: &Channel, mut msg: MailboxMessage) -> Result<()> {
525        let message_id = 0; // one-way message, same as `broadcast`
526        msg.id = message_id;
527
528        let pusher_id = ch.pusher_id();
529        debug!("Sending mailbox message {msg:?} to {pusher_id}");
530
531        self.pushers.push(pusher_id, msg).await?;
532
533        Ok(())
534    }
535
536    async fn broadcast(&self, ch: &BroadcastChannel, msg: &MailboxMessage) -> Result<()> {
537        self.pushers.broadcast(ch.pusher_range(), msg).await
538    }
539
540    async fn on_recv(&self, id: MessageId, maybe_msg: Result<MailboxMessage>) -> Result<()> {
541        debug!("Received mailbox message {maybe_msg:?}");
542
543        let _ = self.timeouts.remove(&id);
544
545        if let Some((_, tx)) = self.senders.remove(&id) {
546            tx.send(maybe_msg)
547                .map_err(|_| error::MailboxClosedSnafu { id }.build())?;
548        } else if let Ok(finally_msg) = maybe_msg {
549            warn!("The response arrived too late: {finally_msg:?}");
550        }
551
552        Ok(())
553    }
554
555    async fn reset(&self) {
556        let keys = self.pushers.clear().await;
557        if !keys.is_empty() {
558            info!("Reset mailbox, deregister pushers: {:?}", keys);
559            METRIC_META_HEARTBEAT_CONNECTION_NUM.sub(keys.len() as i64);
560        }
561    }
562}
563
564/// The builder to build the group of heartbeat handlers.
565pub struct HeartbeatHandlerGroupBuilder {
566    /// The handler to handle region failure.
567    region_failure_handler: Option<RegionFailureHandler>,
568
569    /// The handler to handle region lease.
570    region_lease_handler: Option<RegionLeaseHandler>,
571
572    /// The factor that determines how often statistics should be flushed,
573    /// based on the number of received heartbeats. When the number of heartbeats
574    /// reaches this factor, a flush operation is triggered.
575    flush_stats_factor: Option<usize>,
576    /// A simple handler for flow internal state report
577    flow_state_handler: Option<FlowStateHandler>,
578
579    /// The handler to persist stats.
580    persist_stats_handler: Option<PersistStatsHandler>,
581
582    /// The plugins.
583    plugins: Option<Plugins>,
584
585    /// The heartbeat response pushers.
586    pushers: Pushers,
587
588    /// The group of heartbeat handlers.
589    handlers: Vec<NameCachedHandler>,
590}
591
592impl HeartbeatHandlerGroupBuilder {
593    pub fn new(pushers: Pushers) -> Self {
594        Self {
595            region_failure_handler: None,
596            region_lease_handler: None,
597            flush_stats_factor: None,
598            flow_state_handler: None,
599            persist_stats_handler: None,
600            plugins: None,
601            pushers,
602            handlers: vec![],
603        }
604    }
605
606    pub fn with_flow_state_handler(mut self, handler: Option<FlowStateHandler>) -> Self {
607        self.flow_state_handler = handler;
608        self
609    }
610
611    pub fn with_region_lease_handler(mut self, handler: Option<RegionLeaseHandler>) -> Self {
612        self.region_lease_handler = handler;
613        self
614    }
615
616    /// Sets the [`RegionFailureHandler`].
617    pub fn with_region_failure_handler(mut self, handler: Option<RegionFailureHandler>) -> Self {
618        self.region_failure_handler = handler;
619        self
620    }
621
622    /// Sets the flush stats factor.
623    pub fn with_flush_stats_factor(mut self, flush_stats_factor: Option<usize>) -> Self {
624        self.flush_stats_factor = flush_stats_factor;
625        self
626    }
627
628    pub fn with_persist_stats_handler(mut self, handler: Option<PersistStatsHandler>) -> Self {
629        self.persist_stats_handler = handler;
630        self
631    }
632
633    /// Sets the [`Plugins`].
634    pub fn with_plugins(mut self, plugins: Option<Plugins>) -> Self {
635        self.plugins = plugins;
636        self
637    }
638
639    /// Adds the default handlers.
640    pub fn add_default_handlers(mut self) -> Self {
641        // Extract the `PublishHeartbeatHandler` from the plugins.
642        let publish_heartbeat_handler = if let Some(plugins) = self.plugins.as_ref() {
643            plugins
644                .get::<PublisherRef>()
645                .map(|publish| PublishHeartbeatHandler::new(publish.clone()))
646        } else {
647            None
648        };
649
650        self.add_handler_last(ResponseHeaderHandler);
651        // `KeepLeaseHandler` should preferably be in front of `CheckLeaderHandler`,
652        // because even if the current meta-server node is no longer the leader it can
653        // still help the datanode to keep lease.
654        self.add_handler_last(DatanodeKeepLeaseHandler);
655        self.add_handler_last(FlownodeKeepLeaseHandler);
656        self.add_handler_last(CheckLeaderHandler);
657        self.add_handler_last(OnLeaderStartHandler);
658        self.add_handler_last(ExtractStatHandler);
659        self.add_handler_last(CollectDatanodeClusterInfoHandler);
660        self.add_handler_last(CollectFrontendClusterInfoHandler);
661        self.add_handler_last(CollectFlownodeClusterInfoHandler);
662        self.add_handler_last(MailboxHandler);
663        if let Some(region_lease_handler) = self.region_lease_handler.take() {
664            self.add_handler_last(region_lease_handler);
665        }
666        self.add_handler_last(FilterInactiveRegionStatsHandler);
667        if let Some(region_failure_handler) = self.region_failure_handler.take() {
668            self.add_handler_last(region_failure_handler);
669        }
670        if let Some(publish_heartbeat_handler) = publish_heartbeat_handler {
671            self.add_handler_last(publish_heartbeat_handler);
672        }
673        self.add_handler_last(CollectLeaderRegionHandler);
674        self.add_handler_last(CollectTopicStatsHandler);
675        // Persist stats handler should be in front of collect stats handler.
676        // Because collect stats handler will consume the stats from the accumulator.
677        if let Some(persist_stats_handler) = self.persist_stats_handler.take() {
678            self.add_handler_last(persist_stats_handler);
679        }
680        self.add_handler_last(CollectStatsHandler::new(self.flush_stats_factor));
681        self.add_handler_last(RemapFlowPeerHandler::default());
682
683        if let Some(flow_state_handler) = self.flow_state_handler.take() {
684            self.add_handler_last(flow_state_handler);
685        }
686
687        self
688    }
689
690    /// Builds the group of heartbeat handlers.
691    ///
692    /// Applies the customizer if it exists.
693    pub fn build(mut self) -> Result<HeartbeatHandlerGroup> {
694        if let Some(customizer) = self
695            .plugins
696            .as_ref()
697            .and_then(|plugins| plugins.get::<HeartbeatHandlerGroupBuilderCustomizerRef>())
698        {
699            debug!("Customizing the heartbeat handler group builder");
700            customizer.customize(&mut self)?;
701        }
702
703        Ok(HeartbeatHandlerGroup {
704            handlers: self.handlers,
705            pushers: self.pushers,
706        })
707    }
708
709    fn add_handler_after_inner(&mut self, target: &str, handler: NameCachedHandler) -> Result<()> {
710        if let Some(pos) = self.handlers.iter().position(|x| x.name == target) {
711            self.handlers.insert(pos + 1, handler);
712            return Ok(());
713        }
714
715        error::HandlerNotFoundSnafu { name: target }.fail()
716    }
717
718    /// Adds the handler after the specified handler.
719    pub fn add_handler_after(
720        &mut self,
721        target: &'static str,
722        handler: impl HeartbeatHandler + 'static,
723    ) -> Result<()> {
724        self.add_handler_after_inner(target, NameCachedHandler::new(handler))
725    }
726
727    fn add_handler_before_inner(&mut self, target: &str, handler: NameCachedHandler) -> Result<()> {
728        if let Some(pos) = self.handlers.iter().position(|x| x.name == target) {
729            self.handlers.insert(pos, handler);
730            return Ok(());
731        }
732
733        error::HandlerNotFoundSnafu { name: target }.fail()
734    }
735
736    /// Adds the handler before the specified handler.
737    pub fn add_handler_before(
738        &mut self,
739        target: &'static str,
740        handler: impl HeartbeatHandler + 'static,
741    ) -> Result<()> {
742        self.add_handler_before_inner(target, NameCachedHandler::new(handler))
743    }
744
745    fn replace_handler_inner(&mut self, target: &str, handler: NameCachedHandler) -> Result<()> {
746        if let Some(pos) = self.handlers.iter().position(|x| x.name == target) {
747            self.handlers[pos] = handler;
748            return Ok(());
749        }
750
751        error::HandlerNotFoundSnafu { name: target }.fail()
752    }
753
754    /// Replaces the handler with the specified name.
755    pub fn replace_handler(
756        &mut self,
757        target: &'static str,
758        handler: impl HeartbeatHandler + 'static,
759    ) -> Result<()> {
760        self.replace_handler_inner(target, NameCachedHandler::new(handler))
761    }
762
763    fn add_handler_last_inner(&mut self, handler: NameCachedHandler) {
764        self.handlers.push(handler);
765    }
766
767    fn add_handler_last(&mut self, handler: impl HeartbeatHandler + 'static) {
768        self.add_handler_last_inner(NameCachedHandler::new(handler));
769    }
770}
771
772pub type HeartbeatHandlerGroupBuilderCustomizerRef =
773    Arc<dyn HeartbeatHandlerGroupBuilderCustomizer>;
774
775pub enum CustomizeHeartbeatGroupAction {
776    AddHandlerAfter {
777        target: String,
778        handler: NameCachedHandler,
779    },
780    AddHandlerBefore {
781        target: String,
782        handler: NameCachedHandler,
783    },
784    ReplaceHandler {
785        target: String,
786        handler: NameCachedHandler,
787    },
788    AddHandlerLast {
789        handler: NameCachedHandler,
790    },
791}
792
793impl CustomizeHeartbeatGroupAction {
794    pub fn new_add_handler_after(
795        target: &'static str,
796        handler: impl HeartbeatHandler + 'static,
797    ) -> Self {
798        Self::AddHandlerAfter {
799            target: target.to_string(),
800            handler: NameCachedHandler::new(handler),
801        }
802    }
803
804    pub fn new_add_handler_before(
805        target: &'static str,
806        handler: impl HeartbeatHandler + 'static,
807    ) -> Self {
808        Self::AddHandlerBefore {
809            target: target.to_string(),
810            handler: NameCachedHandler::new(handler),
811        }
812    }
813
814    pub fn new_replace_handler(
815        target: &'static str,
816        handler: impl HeartbeatHandler + 'static,
817    ) -> Self {
818        Self::ReplaceHandler {
819            target: target.to_string(),
820            handler: NameCachedHandler::new(handler),
821        }
822    }
823
824    pub fn new_add_handler_last(handler: impl HeartbeatHandler + 'static) -> Self {
825        Self::AddHandlerLast {
826            handler: NameCachedHandler::new(handler),
827        }
828    }
829}
830
831/// The customizer of the [`HeartbeatHandlerGroupBuilder`].
832pub trait HeartbeatHandlerGroupBuilderCustomizer: Send + Sync {
833    fn customize(&self, builder: &mut HeartbeatHandlerGroupBuilder) -> Result<()>;
834
835    fn add_action(&self, action: CustomizeHeartbeatGroupAction);
836}
837
838#[derive(Default)]
839pub struct DefaultHeartbeatHandlerGroupBuilderCustomizer {
840    actions: Mutex<Vec<CustomizeHeartbeatGroupAction>>,
841}
842
843impl HeartbeatHandlerGroupBuilderCustomizer for DefaultHeartbeatHandlerGroupBuilderCustomizer {
844    fn customize(&self, builder: &mut HeartbeatHandlerGroupBuilder) -> Result<()> {
845        info!("Customizing the heartbeat handler group builder");
846        let mut actions = self.actions.lock().unwrap();
847        for action in actions.drain(..) {
848            match action {
849                CustomizeHeartbeatGroupAction::AddHandlerAfter { target, handler } => {
850                    builder.add_handler_after_inner(&target, handler)?;
851                }
852                CustomizeHeartbeatGroupAction::AddHandlerBefore { target, handler } => {
853                    builder.add_handler_before_inner(&target, handler)?;
854                }
855                CustomizeHeartbeatGroupAction::ReplaceHandler { target, handler } => {
856                    builder.replace_handler_inner(&target, handler)?;
857                }
858                CustomizeHeartbeatGroupAction::AddHandlerLast { handler } => {
859                    builder.add_handler_last_inner(handler);
860                }
861            }
862        }
863        Ok(())
864    }
865
866    fn add_action(&self, action: CustomizeHeartbeatGroupAction) {
867        self.actions.lock().unwrap().push(action);
868    }
869}
870
871#[cfg(test)]
872mod tests {
873
874    use std::assert_matches::assert_matches;
875    use std::sync::Arc;
876    use std::time::Duration;
877
878    use api::v1::meta::{MailboxMessage, Role};
879    use common_meta::kv_backend::memory::MemoryKvBackend;
880    use common_meta::sequence::SequenceBuilder;
881    use tokio::sync::mpsc;
882
883    use super::{HeartbeatHandlerGroupBuilder, PusherId, Pushers};
884    use crate::error;
885    use crate::handler::collect_stats_handler::CollectStatsHandler;
886    use crate::handler::response_header_handler::ResponseHeaderHandler;
887    use crate::handler::{HeartbeatHandlerGroup, HeartbeatMailbox, Pusher};
888    use crate::service::mailbox::{Channel, MailboxReceiver, MailboxRef};
889
890    #[tokio::test]
891    async fn test_mailbox() {
892        let (mailbox, receiver) = push_msg_via_mailbox().await;
893        let id = receiver.message_id();
894
895        let resp_msg = MailboxMessage {
896            id,
897            subject: "resp-test".to_string(),
898            timestamp_millis: 456,
899            ..Default::default()
900        };
901
902        mailbox.on_recv(id, Ok(resp_msg)).await.unwrap();
903
904        let recv_msg = receiver.await.unwrap();
905        assert_eq!(recv_msg.id, id);
906        assert_eq!(recv_msg.timestamp_millis, 456);
907        assert_eq!(recv_msg.subject, "resp-test".to_string());
908    }
909
910    #[tokio::test]
911    async fn test_mailbox_timeout() {
912        let (_, receiver) = push_msg_via_mailbox().await;
913        let res = receiver.await;
914        assert!(res.is_err());
915    }
916
917    async fn push_msg_via_mailbox() -> (MailboxRef, MailboxReceiver) {
918        let datanode_id = 12;
919        let (pusher_tx, mut pusher_rx) = mpsc::channel(16);
920        let pusher_id = PusherId::new(Role::Datanode, datanode_id);
921        let pusher: Pusher = Pusher::new(pusher_tx);
922        let handler_group = HeartbeatHandlerGroup::default();
923        handler_group.register_pusher(pusher_id, pusher).await;
924
925        let kv_backend = Arc::new(MemoryKvBackend::new());
926        let seq = SequenceBuilder::new("test_seq", kv_backend).build();
927        let mailbox = HeartbeatMailbox::create(handler_group.pushers(), seq);
928
929        let msg = MailboxMessage {
930            id: 0,
931            subject: "req-test".to_string(),
932            timestamp_millis: 123,
933            ..Default::default()
934        };
935        let ch = Channel::Datanode(datanode_id);
936
937        let receiver = mailbox
938            .send(&ch, msg, Duration::from_secs(1))
939            .await
940            .unwrap();
941
942        let recv_obj = pusher_rx.recv().await.unwrap().unwrap();
943        let message = recv_obj.mailbox_message.unwrap();
944        assert_eq!(message.timestamp_millis, 123);
945        assert_eq!(message.subject, "req-test".to_string());
946
947        (mailbox, receiver)
948    }
949
950    #[test]
951    fn test_handler_group_builder() {
952        let group = HeartbeatHandlerGroupBuilder::new(Pushers::default())
953            .add_default_handlers()
954            .build()
955            .unwrap();
956
957        let handlers = group.handlers;
958        let names = [
959            "ResponseHeaderHandler",
960            "DatanodeKeepLeaseHandler",
961            "FlownodeKeepLeaseHandler",
962            "CheckLeaderHandler",
963            "OnLeaderStartHandler",
964            "ExtractStatHandler",
965            "CollectDatanodeClusterInfoHandler",
966            "CollectFrontendClusterInfoHandler",
967            "CollectFlownodeClusterInfoHandler",
968            "MailboxHandler",
969            "FilterInactiveRegionStatsHandler",
970            "CollectLeaderRegionHandler",
971            "CollectTopicStatsHandler",
972            "CollectStatsHandler",
973            "RemapFlowPeerHandler",
974        ];
975        assert_eq!(names.len(), handlers.len());
976        for (handler, name) in handlers.iter().zip(names.into_iter()) {
977            assert_eq!(handler.name, name);
978        }
979    }
980
981    #[test]
982    fn test_handler_group_builder_add_before() {
983        let mut builder =
984            HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers();
985        builder
986            .add_handler_before(
987                "FilterInactiveRegionStatsHandler",
988                CollectStatsHandler::default(),
989            )
990            .unwrap();
991
992        let group = builder.build().unwrap();
993        let handlers = group.handlers;
994        let names = [
995            "ResponseHeaderHandler",
996            "DatanodeKeepLeaseHandler",
997            "FlownodeKeepLeaseHandler",
998            "CheckLeaderHandler",
999            "OnLeaderStartHandler",
1000            "ExtractStatHandler",
1001            "CollectDatanodeClusterInfoHandler",
1002            "CollectFrontendClusterInfoHandler",
1003            "CollectFlownodeClusterInfoHandler",
1004            "MailboxHandler",
1005            "CollectStatsHandler",
1006            "FilterInactiveRegionStatsHandler",
1007            "CollectLeaderRegionHandler",
1008            "CollectTopicStatsHandler",
1009            "CollectStatsHandler",
1010            "RemapFlowPeerHandler",
1011        ];
1012        assert_eq!(names.len(), handlers.len());
1013        for (handler, name) in handlers.iter().zip(names.into_iter()) {
1014            assert_eq!(handler.name, name);
1015        }
1016    }
1017
1018    #[test]
1019    fn test_handler_group_builder_add_before_first() {
1020        let mut builder =
1021            HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers();
1022        builder
1023            .add_handler_before("ResponseHeaderHandler", CollectStatsHandler::default())
1024            .unwrap();
1025
1026        let group = builder.build().unwrap();
1027        let handlers = group.handlers;
1028        let names = [
1029            "CollectStatsHandler",
1030            "ResponseHeaderHandler",
1031            "DatanodeKeepLeaseHandler",
1032            "FlownodeKeepLeaseHandler",
1033            "CheckLeaderHandler",
1034            "OnLeaderStartHandler",
1035            "ExtractStatHandler",
1036            "CollectDatanodeClusterInfoHandler",
1037            "CollectFrontendClusterInfoHandler",
1038            "CollectFlownodeClusterInfoHandler",
1039            "MailboxHandler",
1040            "FilterInactiveRegionStatsHandler",
1041            "CollectLeaderRegionHandler",
1042            "CollectTopicStatsHandler",
1043            "CollectStatsHandler",
1044            "RemapFlowPeerHandler",
1045        ];
1046        assert_eq!(names.len(), handlers.len());
1047        for (handler, name) in handlers.iter().zip(names.into_iter()) {
1048            assert_eq!(handler.name, name);
1049        }
1050    }
1051
1052    #[test]
1053    fn test_handler_group_builder_add_after() {
1054        let mut builder =
1055            HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers();
1056        builder
1057            .add_handler_after("MailboxHandler", CollectStatsHandler::default())
1058            .unwrap();
1059
1060        let group = builder.build().unwrap();
1061        let handlers = group.handlers;
1062        let names = [
1063            "ResponseHeaderHandler",
1064            "DatanodeKeepLeaseHandler",
1065            "FlownodeKeepLeaseHandler",
1066            "CheckLeaderHandler",
1067            "OnLeaderStartHandler",
1068            "ExtractStatHandler",
1069            "CollectDatanodeClusterInfoHandler",
1070            "CollectFrontendClusterInfoHandler",
1071            "CollectFlownodeClusterInfoHandler",
1072            "MailboxHandler",
1073            "CollectStatsHandler",
1074            "FilterInactiveRegionStatsHandler",
1075            "CollectLeaderRegionHandler",
1076            "CollectTopicStatsHandler",
1077            "CollectStatsHandler",
1078            "RemapFlowPeerHandler",
1079        ];
1080        assert_eq!(names.len(), handlers.len());
1081        for (handler, name) in handlers.iter().zip(names.into_iter()) {
1082            assert_eq!(handler.name, name);
1083        }
1084    }
1085
1086    #[test]
1087    fn test_handler_group_builder_add_after_last() {
1088        let mut builder =
1089            HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers();
1090        builder
1091            .add_handler_after("CollectStatsHandler", ResponseHeaderHandler)
1092            .unwrap();
1093
1094        let group = builder.build().unwrap();
1095        let handlers = group.handlers;
1096        let names = [
1097            "ResponseHeaderHandler",
1098            "DatanodeKeepLeaseHandler",
1099            "FlownodeKeepLeaseHandler",
1100            "CheckLeaderHandler",
1101            "OnLeaderStartHandler",
1102            "ExtractStatHandler",
1103            "CollectDatanodeClusterInfoHandler",
1104            "CollectFrontendClusterInfoHandler",
1105            "CollectFlownodeClusterInfoHandler",
1106            "MailboxHandler",
1107            "FilterInactiveRegionStatsHandler",
1108            "CollectLeaderRegionHandler",
1109            "CollectTopicStatsHandler",
1110            "CollectStatsHandler",
1111            "ResponseHeaderHandler",
1112            "RemapFlowPeerHandler",
1113        ];
1114        assert_eq!(names.len(), handlers.len());
1115        for (handler, name) in handlers.iter().zip(names.into_iter()) {
1116            assert_eq!(handler.name, name);
1117        }
1118    }
1119
1120    #[test]
1121    fn test_handler_group_builder_replace() {
1122        let mut builder =
1123            HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers();
1124        builder
1125            .replace_handler("MailboxHandler", CollectStatsHandler::default())
1126            .unwrap();
1127
1128        let group = builder.build().unwrap();
1129        let handlers = group.handlers;
1130        let names = [
1131            "ResponseHeaderHandler",
1132            "DatanodeKeepLeaseHandler",
1133            "FlownodeKeepLeaseHandler",
1134            "CheckLeaderHandler",
1135            "OnLeaderStartHandler",
1136            "ExtractStatHandler",
1137            "CollectDatanodeClusterInfoHandler",
1138            "CollectFrontendClusterInfoHandler",
1139            "CollectFlownodeClusterInfoHandler",
1140            "CollectStatsHandler",
1141            "FilterInactiveRegionStatsHandler",
1142            "CollectLeaderRegionHandler",
1143            "CollectTopicStatsHandler",
1144            "CollectStatsHandler",
1145            "RemapFlowPeerHandler",
1146        ];
1147
1148        assert_eq!(names.len(), handlers.len());
1149        for (handler, name) in handlers.iter().zip(names.into_iter()) {
1150            assert_eq!(handler.name, name);
1151        }
1152    }
1153
1154    #[test]
1155    fn test_handler_group_builder_replace_last() {
1156        let mut builder =
1157            HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers();
1158        builder
1159            .replace_handler("CollectStatsHandler", ResponseHeaderHandler)
1160            .unwrap();
1161
1162        let group = builder.build().unwrap();
1163        let handlers = group.handlers;
1164        let names = [
1165            "ResponseHeaderHandler",
1166            "DatanodeKeepLeaseHandler",
1167            "FlownodeKeepLeaseHandler",
1168            "CheckLeaderHandler",
1169            "OnLeaderStartHandler",
1170            "ExtractStatHandler",
1171            "CollectDatanodeClusterInfoHandler",
1172            "CollectFrontendClusterInfoHandler",
1173            "CollectFlownodeClusterInfoHandler",
1174            "MailboxHandler",
1175            "FilterInactiveRegionStatsHandler",
1176            "CollectLeaderRegionHandler",
1177            "CollectTopicStatsHandler",
1178            "ResponseHeaderHandler",
1179            "RemapFlowPeerHandler",
1180        ];
1181
1182        assert_eq!(names.len(), handlers.len());
1183        for (handler, name) in handlers.iter().zip(names.into_iter()) {
1184            assert_eq!(handler.name, name);
1185        }
1186    }
1187
1188    #[test]
1189    fn test_handler_group_builder_replace_first() {
1190        let mut builder =
1191            HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers();
1192        builder
1193            .replace_handler("ResponseHeaderHandler", CollectStatsHandler::default())
1194            .unwrap();
1195
1196        let group = builder.build().unwrap();
1197        let handlers = group.handlers;
1198        let names = [
1199            "CollectStatsHandler",
1200            "DatanodeKeepLeaseHandler",
1201            "FlownodeKeepLeaseHandler",
1202            "CheckLeaderHandler",
1203            "OnLeaderStartHandler",
1204            "ExtractStatHandler",
1205            "CollectDatanodeClusterInfoHandler",
1206            "CollectFrontendClusterInfoHandler",
1207            "CollectFlownodeClusterInfoHandler",
1208            "MailboxHandler",
1209            "FilterInactiveRegionStatsHandler",
1210            "CollectLeaderRegionHandler",
1211            "CollectTopicStatsHandler",
1212            "CollectStatsHandler",
1213            "RemapFlowPeerHandler",
1214        ];
1215        assert_eq!(names.len(), handlers.len());
1216        for (handler, name) in handlers.iter().zip(names.into_iter()) {
1217            assert_eq!(handler.name, name);
1218        }
1219    }
1220
1221    #[test]
1222    fn test_handler_group_builder_handler_not_found() {
1223        let mut builder =
1224            HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers();
1225        let err = builder
1226            .add_handler_before("NotExists", CollectStatsHandler::default())
1227            .unwrap_err();
1228        assert_matches!(err, error::Error::HandlerNotFound { .. });
1229
1230        let err = builder
1231            .add_handler_after("NotExists", CollectStatsHandler::default())
1232            .unwrap_err();
1233        assert_matches!(err, error::Error::HandlerNotFound { .. });
1234
1235        let err = builder
1236            .replace_handler("NotExists", CollectStatsHandler::default())
1237            .unwrap_err();
1238        assert_matches!(err, error::Error::HandlerNotFound { .. });
1239    }
1240
1241    #[tokio::test]
1242    async fn test_pusher_drop() {
1243        let (tx, _rx) = mpsc::channel(1);
1244        let pusher = Pusher::new(tx);
1245        let mut deregister_signal_tx = pusher.deregister_signal_receiver.clone();
1246
1247        drop(pusher);
1248        deregister_signal_tx.changed().await.unwrap();
1249    }
1250}