meta_srv/
handler.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashSet};
16use std::fmt::{Debug, Display};
17use std::ops::Range;
18use std::sync::{Arc, Mutex};
19use std::time::{Duration, Instant};
20
21use api::v1::meta::mailbox_message::Payload;
22use api::v1::meta::{
23    HeartbeatRequest, HeartbeatResponse, MailboxMessage, PROTOCOL_VERSION, RegionLease,
24    ResponseHeader, Role,
25};
26use check_leader_handler::CheckLeaderHandler;
27use collect_cluster_info_handler::{
28    CollectDatanodeClusterInfoHandler, CollectFlownodeClusterInfoHandler,
29    CollectFrontendClusterInfoHandler,
30};
31use collect_leader_region_handler::CollectLeaderRegionHandler;
32use collect_stats_handler::CollectStatsHandler;
33use common_base::Plugins;
34use common_meta::datanode::Stat;
35use common_meta::instruction::InstructionReply;
36use common_meta::sequence::Sequence;
37use common_telemetry::{debug, info, warn};
38use dashmap::DashMap;
39use extract_stat_handler::ExtractStatHandler;
40use failure_handler::RegionFailureHandler;
41use filter_inactive_region_stats::FilterInactiveRegionStatsHandler;
42use futures::future::join_all;
43use keep_lease_handler::{DatanodeKeepLeaseHandler, FlownodeKeepLeaseHandler};
44use mailbox_handler::MailboxHandler;
45use on_leader_start_handler::OnLeaderStartHandler;
46use publish_heartbeat_handler::PublishHeartbeatHandler;
47use region_lease_handler::RegionLeaseHandler;
48use remap_flow_peer_handler::RemapFlowPeerHandler;
49use response_header_handler::ResponseHeaderHandler;
50use snafu::{OptionExt, ResultExt};
51use store_api::storage::RegionId;
52use tokio::sync::mpsc::Sender;
53use tokio::sync::{Notify, RwLock, oneshot, watch};
54
55use crate::error::{self, DeserializeFromJsonSnafu, Result, UnexpectedInstructionReplySnafu};
56use crate::handler::collect_topic_stats_handler::CollectTopicStatsHandler;
57use crate::handler::flow_state_handler::FlowStateHandler;
58use crate::handler::persist_stats_handler::PersistStatsHandler;
59use crate::metasrv::Context;
60use crate::metrics::{METRIC_META_HANDLER_EXECUTE, METRIC_META_HEARTBEAT_CONNECTION_NUM};
61use crate::pubsub::PublisherRef;
62use crate::service::mailbox::{
63    BroadcastChannel, Channel, Mailbox, MailboxReceiver, MailboxRef, MessageId,
64};
65
66pub mod check_leader_handler;
67pub mod collect_cluster_info_handler;
68pub mod collect_leader_region_handler;
69pub mod collect_stats_handler;
70pub mod collect_topic_stats_handler;
71pub mod extract_stat_handler;
72pub mod failure_handler;
73pub mod filter_inactive_region_stats;
74pub mod flow_state_handler;
75pub mod keep_lease_handler;
76pub mod mailbox_handler;
77pub mod on_leader_start_handler;
78pub mod persist_stats_handler;
79pub mod publish_heartbeat_handler;
80pub mod region_lease_handler;
81pub mod remap_flow_peer_handler;
82pub mod response_header_handler;
83
84#[cfg(test)]
85pub mod test_utils;
86
87#[async_trait::async_trait]
88pub trait HeartbeatHandler: Send + Sync {
89    fn is_acceptable(&self, role: Role) -> bool;
90
91    fn name(&self) -> &'static str {
92        let type_name = std::any::type_name::<Self>();
93        // short name
94        type_name.split("::").last().unwrap_or(type_name)
95    }
96
97    async fn handle(
98        &self,
99        req: &HeartbeatRequest,
100        ctx: &mut Context,
101        acc: &mut HeartbeatAccumulator,
102    ) -> Result<HandleControl>;
103}
104
105/// HandleControl
106///
107/// Controls process of handling heartbeat request.
108#[derive(PartialEq, Debug)]
109pub enum HandleControl {
110    Continue,
111    Done,
112}
113
114#[derive(Debug, Default)]
115pub struct HeartbeatAccumulator {
116    pub header: Option<ResponseHeader>,
117    mailbox_message: Option<MailboxMessage>,
118    pub stat: Option<Stat>,
119    pub inactive_region_ids: HashSet<RegionId>,
120    pub region_lease: Option<RegionLease>,
121}
122
123impl HeartbeatAccumulator {
124    pub(crate) fn take_mailbox_message(&mut self) -> Option<MailboxMessage> {
125        self.mailbox_message.take()
126    }
127
128    pub fn set_mailbox_message(&mut self, message: MailboxMessage) {
129        let _ = self.mailbox_message.insert(message);
130    }
131}
132
133#[derive(Copy, Clone)]
134pub struct PusherId {
135    pub role: Role,
136    pub id: u64,
137}
138
139impl Debug for PusherId {
140    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
141        write!(f, "{:?}-{}", self.role, self.id)
142    }
143}
144
145impl Display for PusherId {
146    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
147        write!(f, "{:?}-{}", self.role, self.id)
148    }
149}
150
151impl PusherId {
152    pub fn new(role: Role, id: u64) -> Self {
153        Self { role, id }
154    }
155
156    pub fn string_key(&self) -> String {
157        format!("{}-{}", self.role as i32, self.id)
158    }
159}
160
161/// The receiver of the deregister signal.
162pub type DeregisterSignalReceiver = watch::Receiver<bool>;
163
164/// The pusher of the heartbeat response.
165pub struct Pusher {
166    sender: Sender<std::result::Result<HeartbeatResponse, tonic::Status>>,
167    // The sender of the deregister signal.
168    // default is false, means the pusher is not deregistered.
169    // when the pusher is deregistered, the sender will be notified.
170    deregister_signal_sender: watch::Sender<bool>,
171    deregister_signal_receiver: DeregisterSignalReceiver,
172
173    res_header: ResponseHeader,
174}
175
176impl Drop for Pusher {
177    fn drop(&mut self) {
178        // Ignore the error here.
179        // if all the receivers have been dropped, means no body cares the deregister signal.
180        let _ = self.deregister_signal_sender.send(true);
181    }
182}
183
184impl Pusher {
185    pub fn new(sender: Sender<std::result::Result<HeartbeatResponse, tonic::Status>>) -> Self {
186        let res_header = ResponseHeader {
187            protocol_version: PROTOCOL_VERSION,
188            ..Default::default()
189        };
190        let (deregister_signal_sender, deregister_signal_receiver) = watch::channel(false);
191        Self {
192            sender,
193            deregister_signal_sender,
194            deregister_signal_receiver,
195            res_header,
196        }
197    }
198
199    #[inline]
200    pub async fn push(&self, res: HeartbeatResponse) -> Result<()> {
201        self.sender.send(Ok(res)).await.map_err(|e| {
202            error::PushMessageSnafu {
203                err_msg: e.to_string(),
204            }
205            .build()
206        })
207    }
208
209    #[inline]
210    pub fn header(&self) -> ResponseHeader {
211        self.res_header.clone()
212    }
213}
214
215/// The group of heartbeat pushers.
216#[derive(Clone, Default)]
217pub struct Pushers(Arc<RwLock<BTreeMap<String, Pusher>>>);
218
219impl Pushers {
220    async fn push(
221        &self,
222        pusher_id: PusherId,
223        mailbox_message: MailboxMessage,
224    ) -> Result<DeregisterSignalReceiver> {
225        let pusher_id = pusher_id.string_key();
226        let pushers = self.0.read().await;
227        let pusher = pushers
228            .get(&pusher_id)
229            .context(error::PusherNotFoundSnafu { pusher_id })?;
230
231        pusher
232            .push(HeartbeatResponse {
233                header: Some(pusher.header()),
234                mailbox_message: Some(mailbox_message),
235                ..Default::default()
236            })
237            .await?;
238
239        Ok(pusher.deregister_signal_receiver.clone())
240    }
241
242    async fn broadcast(
243        &self,
244        range: Range<String>,
245        mailbox_message: &MailboxMessage,
246    ) -> Result<()> {
247        let pushers = self.0.read().await;
248        let pushers = pushers
249            .range(range)
250            .map(|(_, value)| value)
251            .collect::<Vec<_>>();
252        let mut results = Vec::with_capacity(pushers.len());
253
254        for pusher in pushers {
255            let mut mailbox_message = mailbox_message.clone();
256            mailbox_message.id = 0; // one-way message
257
258            results.push(pusher.push(HeartbeatResponse {
259                header: Some(pusher.header()),
260                mailbox_message: Some(mailbox_message),
261                ..Default::default()
262            }))
263        }
264
265        // Checks the error out of the loop.
266        let _ = join_all(results)
267            .await
268            .into_iter()
269            .collect::<Result<Vec<_>>>()?;
270
271        Ok(())
272    }
273
274    pub(crate) async fn insert(&self, pusher_id: String, pusher: Pusher) -> Option<Pusher> {
275        self.0.write().await.insert(pusher_id, pusher)
276    }
277
278    async fn remove(&self, pusher_id: &str) -> Option<Pusher> {
279        self.0.write().await.remove(pusher_id)
280    }
281
282    pub(crate) async fn clear(&self) -> Vec<String> {
283        let mut pushers = self.0.write().await;
284        let keys = pushers.keys().cloned().collect::<Vec<_>>();
285        if !keys.is_empty() {
286            pushers.clear();
287        }
288        keys
289    }
290}
291
292#[derive(Clone)]
293pub struct NameCachedHandler {
294    name: &'static str,
295    handler: Arc<dyn HeartbeatHandler>,
296}
297
298impl NameCachedHandler {
299    fn new(handler: impl HeartbeatHandler + 'static) -> Self {
300        let name = handler.name();
301        let handler = Arc::new(handler);
302        Self { name, handler }
303    }
304}
305
306pub type HeartbeatHandlerGroupRef = Arc<HeartbeatHandlerGroup>;
307
308/// The group of heartbeat handlers.
309#[derive(Default, Clone)]
310pub struct HeartbeatHandlerGroup {
311    handlers: Vec<NameCachedHandler>,
312    pushers: Pushers,
313}
314
315impl HeartbeatHandlerGroup {
316    /// Registers the heartbeat response [`Pusher`] with the given key to the group.
317    pub async fn register_pusher(&self, pusher_id: PusherId, pusher: Pusher) {
318        METRIC_META_HEARTBEAT_CONNECTION_NUM.inc();
319        info!("Pusher register: {}", pusher_id);
320        let _ = self.pushers.insert(pusher_id.string_key(), pusher).await;
321    }
322
323    /// Deregisters the heartbeat response [`Pusher`] with the given key from the group.
324    pub async fn deregister_push(&self, pusher_id: PusherId) {
325        info!("Pusher unregister: {}", pusher_id);
326        if self.pushers.remove(&pusher_id.string_key()).await.is_some() {
327            METRIC_META_HEARTBEAT_CONNECTION_NUM.dec();
328        }
329    }
330
331    /// Returns the [`Pushers`] of the group.
332    pub fn pushers(&self) -> Pushers {
333        self.pushers.clone()
334    }
335
336    /// Handles the heartbeat request.
337    pub async fn handle(
338        &self,
339        req: HeartbeatRequest,
340        mut ctx: Context,
341    ) -> Result<HeartbeatResponse> {
342        let mut acc = HeartbeatAccumulator::default();
343        let role = req
344            .header
345            .as_ref()
346            .and_then(|h| Role::try_from(h.role).ok())
347            .context(error::InvalidArgumentsSnafu {
348                err_msg: format!("invalid role: {:?}", req.header),
349            })?;
350
351        let is_handshake = ctx.is_handshake;
352
353        for NameCachedHandler { name, handler } in self.handlers.iter() {
354            if !handler.is_acceptable(role) {
355                continue;
356            }
357
358            let _timer = METRIC_META_HANDLER_EXECUTE
359                .with_label_values(&[*name])
360                .start_timer();
361
362            if handler.handle(&req, &mut ctx, &mut acc).await? == HandleControl::Done {
363                break;
364            }
365        }
366        let header = std::mem::take(&mut acc.header);
367        let mailbox_message = acc.take_mailbox_message();
368
369        // Populate heartbeat_config during handshake
370        let heartbeat_config = if is_handshake {
371            let config = ctx.heartbeat_options_for(role).into();
372
373            info!(
374                "Handshake with {:?} node, sending config: {:?}",
375                role, config
376            );
377
378            Some(config)
379        } else {
380            None
381        };
382
383        let res = HeartbeatResponse {
384            header,
385            region_lease: acc.region_lease,
386            mailbox_message,
387            heartbeat_config,
388        };
389        Ok(res)
390    }
391}
392
393pub struct HeartbeatMailbox {
394    pushers: Pushers,
395    sequence: Sequence,
396    senders: DashMap<MessageId, oneshot::Sender<Result<MailboxMessage>>>,
397    timeouts: DashMap<MessageId, Instant>,
398    timeout_notify: Notify,
399}
400
401impl HeartbeatMailbox {
402    pub fn json_reply(msg: &MailboxMessage) -> Result<InstructionReply> {
403        let Payload::Json(payload) =
404            msg.payload
405                .as_ref()
406                .with_context(|| UnexpectedInstructionReplySnafu {
407                    mailbox_message: msg.to_string(),
408                    reason: format!("empty payload, msg: {msg:?}"),
409                })?;
410        serde_json::from_str(payload).context(DeserializeFromJsonSnafu { input: payload })
411    }
412
413    /// Parses the [Instruction] from [MailboxMessage].
414    #[cfg(test)]
415    pub(crate) fn json_instruction(
416        msg: &MailboxMessage,
417    ) -> Result<common_meta::instruction::Instruction> {
418        let Payload::Json(payload) =
419            msg.payload
420                .as_ref()
421                .with_context(|| UnexpectedInstructionReplySnafu {
422                    mailbox_message: msg.to_string(),
423                    reason: format!("empty payload, msg: {msg:?}"),
424                })?;
425        serde_json::from_str(payload).context(DeserializeFromJsonSnafu { input: payload })
426    }
427
428    pub fn create(pushers: Pushers, sequence: Sequence) -> MailboxRef {
429        let mailbox = Arc::new(Self::new(pushers, sequence));
430
431        let timeout_checker = mailbox.clone();
432        let _handle = common_runtime::spawn_global(async move {
433            timeout_checker.check_timeout_bg(10).await;
434        });
435
436        mailbox
437    }
438
439    fn new(pushers: Pushers, sequence: Sequence) -> Self {
440        Self {
441            pushers,
442            sequence,
443            senders: DashMap::default(),
444            timeouts: DashMap::default(),
445            timeout_notify: Notify::new(),
446        }
447    }
448
449    async fn check_timeout_bg(&self, interval_millis: u64) {
450        let mut interval = tokio::time::interval(Duration::from_millis(interval_millis));
451
452        loop {
453            let _ = interval.tick().await;
454
455            if self.timeouts.is_empty() {
456                self.timeout_notify.notified().await;
457            }
458
459            let now = Instant::now();
460            let timeout_ids = self
461                .timeouts
462                .iter()
463                .filter_map(|entry| {
464                    let (id, deadline) = entry.pair();
465                    if deadline < &now { Some(*id) } else { None }
466                })
467                .collect::<Vec<_>>();
468
469            for id in timeout_ids {
470                let _ = self
471                    .on_recv(id, Err(error::MailboxTimeoutSnafu { id }.build()))
472                    .await;
473            }
474        }
475    }
476
477    #[inline]
478    async fn next_message_id(&self) -> Result<u64> {
479        // In this implementation, we pre-occupy the message_id of 0,
480        // and we use `message_id = 0` to mark a Message as a one-way call.
481        loop {
482            let next = self
483                .sequence
484                .next()
485                .await
486                .context(error::NextSequenceSnafu)?;
487            if next > 0 {
488                return Ok(next);
489            }
490        }
491    }
492}
493
494#[async_trait::async_trait]
495impl Mailbox for HeartbeatMailbox {
496    async fn send(
497        &self,
498        ch: &Channel,
499        mut msg: MailboxMessage,
500        timeout: Duration,
501    ) -> Result<MailboxReceiver> {
502        let message_id = self.next_message_id().await?;
503        msg.id = message_id;
504
505        let pusher_id = ch.pusher_id();
506        debug!("Sending mailbox message {msg:?} to {pusher_id}");
507
508        let (tx, rx) = oneshot::channel();
509        let _ = self.senders.insert(message_id, tx);
510        let deadline = Instant::now() + timeout;
511        self.timeouts.insert(message_id, deadline);
512        self.timeout_notify.notify_one();
513        let deregister_signal_receiver = self.pushers.push(pusher_id, msg).await?;
514
515        Ok(MailboxReceiver::new(
516            message_id,
517            rx,
518            deregister_signal_receiver,
519            *ch,
520        ))
521    }
522
523    async fn send_oneway(&self, ch: &Channel, mut msg: MailboxMessage) -> Result<()> {
524        let message_id = 0; // one-way message, same as `broadcast`
525        msg.id = message_id;
526
527        let pusher_id = ch.pusher_id();
528        debug!("Sending mailbox message {msg:?} to {pusher_id}");
529
530        self.pushers.push(pusher_id, msg).await?;
531
532        Ok(())
533    }
534
535    async fn broadcast(&self, ch: &BroadcastChannel, msg: &MailboxMessage) -> Result<()> {
536        self.pushers.broadcast(ch.pusher_range(), msg).await
537    }
538
539    async fn on_recv(&self, id: MessageId, maybe_msg: Result<MailboxMessage>) -> Result<()> {
540        debug!("Received mailbox message {maybe_msg:?}");
541
542        let _ = self.timeouts.remove(&id);
543
544        if let Some((_, tx)) = self.senders.remove(&id) {
545            tx.send(maybe_msg)
546                .map_err(|_| error::MailboxClosedSnafu { id }.build())?;
547        } else if let Ok(finally_msg) = maybe_msg {
548            warn!("The response arrived too late: {finally_msg:?}");
549        }
550
551        Ok(())
552    }
553
554    async fn reset(&self) {
555        let keys = self.pushers.clear().await;
556        if !keys.is_empty() {
557            info!("Reset mailbox, deregister pushers: {:?}", keys);
558            METRIC_META_HEARTBEAT_CONNECTION_NUM.sub(keys.len() as i64);
559        }
560    }
561}
562
563/// The builder to build the group of heartbeat handlers.
564pub struct HeartbeatHandlerGroupBuilder {
565    /// The handler to handle region failure.
566    region_failure_handler: Option<RegionFailureHandler>,
567
568    /// The handler to handle region lease.
569    region_lease_handler: Option<RegionLeaseHandler>,
570
571    /// The factor that determines how often statistics should be flushed,
572    /// based on the number of received heartbeats. When the number of heartbeats
573    /// reaches this factor, a flush operation is triggered.
574    flush_stats_factor: Option<usize>,
575    /// A simple handler for flow internal state report
576    flow_state_handler: Option<FlowStateHandler>,
577
578    /// The handler to persist stats.
579    persist_stats_handler: Option<PersistStatsHandler>,
580
581    /// The plugins.
582    plugins: Option<Plugins>,
583
584    /// The heartbeat response pushers.
585    pushers: Pushers,
586
587    /// The group of heartbeat handlers.
588    handlers: Vec<NameCachedHandler>,
589}
590
591impl HeartbeatHandlerGroupBuilder {
592    pub fn new(pushers: Pushers) -> Self {
593        Self {
594            region_failure_handler: None,
595            region_lease_handler: None,
596            flush_stats_factor: None,
597            flow_state_handler: None,
598            persist_stats_handler: None,
599            plugins: None,
600            pushers,
601            handlers: vec![],
602        }
603    }
604
605    pub fn with_flow_state_handler(mut self, handler: Option<FlowStateHandler>) -> Self {
606        self.flow_state_handler = handler;
607        self
608    }
609
610    pub fn with_region_lease_handler(mut self, handler: Option<RegionLeaseHandler>) -> Self {
611        self.region_lease_handler = handler;
612        self
613    }
614
615    /// Sets the [`RegionFailureHandler`].
616    pub fn with_region_failure_handler(mut self, handler: Option<RegionFailureHandler>) -> Self {
617        self.region_failure_handler = handler;
618        self
619    }
620
621    /// Sets the flush stats factor.
622    pub fn with_flush_stats_factor(mut self, flush_stats_factor: Option<usize>) -> Self {
623        self.flush_stats_factor = flush_stats_factor;
624        self
625    }
626
627    pub fn with_persist_stats_handler(mut self, handler: Option<PersistStatsHandler>) -> Self {
628        self.persist_stats_handler = handler;
629        self
630    }
631
632    /// Sets the [`Plugins`].
633    pub fn with_plugins(mut self, plugins: Option<Plugins>) -> Self {
634        self.plugins = plugins;
635        self
636    }
637
638    /// Adds the default handlers.
639    pub fn add_default_handlers(mut self) -> Self {
640        // Extract the `PublishHeartbeatHandler` from the plugins.
641        let publish_heartbeat_handler = if let Some(plugins) = self.plugins.as_ref() {
642            plugins
643                .get::<PublisherRef>()
644                .map(|publish| PublishHeartbeatHandler::new(publish.clone()))
645        } else {
646            None
647        };
648
649        self.add_handler_last(ResponseHeaderHandler);
650        // `KeepLeaseHandler` should preferably be in front of `CheckLeaderHandler`,
651        // because even if the current meta-server node is no longer the leader it can
652        // still help the datanode to keep lease.
653        self.add_handler_last(DatanodeKeepLeaseHandler);
654        self.add_handler_last(FlownodeKeepLeaseHandler);
655        self.add_handler_last(CheckLeaderHandler);
656        self.add_handler_last(OnLeaderStartHandler);
657        self.add_handler_last(ExtractStatHandler);
658        self.add_handler_last(CollectDatanodeClusterInfoHandler);
659        self.add_handler_last(CollectFrontendClusterInfoHandler);
660        self.add_handler_last(CollectFlownodeClusterInfoHandler);
661        self.add_handler_last(MailboxHandler);
662        if let Some(region_lease_handler) = self.region_lease_handler.take() {
663            self.add_handler_last(region_lease_handler);
664        }
665        self.add_handler_last(FilterInactiveRegionStatsHandler);
666        if let Some(region_failure_handler) = self.region_failure_handler.take() {
667            self.add_handler_last(region_failure_handler);
668        }
669        if let Some(publish_heartbeat_handler) = publish_heartbeat_handler {
670            self.add_handler_last(publish_heartbeat_handler);
671        }
672        self.add_handler_last(CollectLeaderRegionHandler);
673        self.add_handler_last(CollectTopicStatsHandler);
674        // Persist stats handler should be in front of collect stats handler.
675        // Because collect stats handler will consume the stats from the accumulator.
676        if let Some(persist_stats_handler) = self.persist_stats_handler.take() {
677            self.add_handler_last(persist_stats_handler);
678        }
679        self.add_handler_last(CollectStatsHandler::new(self.flush_stats_factor));
680        self.add_handler_last(RemapFlowPeerHandler::default());
681
682        if let Some(flow_state_handler) = self.flow_state_handler.take() {
683            self.add_handler_last(flow_state_handler);
684        }
685
686        self
687    }
688
689    /// Builds the group of heartbeat handlers.
690    ///
691    /// Applies the customizer if it exists.
692    pub fn build(mut self) -> Result<HeartbeatHandlerGroup> {
693        if let Some(customizer) = self
694            .plugins
695            .as_ref()
696            .and_then(|plugins| plugins.get::<HeartbeatHandlerGroupBuilderCustomizerRef>())
697        {
698            debug!("Customizing the heartbeat handler group builder");
699            customizer.customize(&mut self)?;
700        }
701
702        Ok(HeartbeatHandlerGroup {
703            handlers: self.handlers,
704            pushers: self.pushers,
705        })
706    }
707
708    fn add_handler_after_inner(&mut self, target: &str, handler: NameCachedHandler) -> Result<()> {
709        if let Some(pos) = self.handlers.iter().position(|x| x.name == target) {
710            self.handlers.insert(pos + 1, handler);
711            return Ok(());
712        }
713
714        error::HandlerNotFoundSnafu { name: target }.fail()
715    }
716
717    /// Adds the handler after the specified handler.
718    pub fn add_handler_after(
719        &mut self,
720        target: &'static str,
721        handler: impl HeartbeatHandler + 'static,
722    ) -> Result<()> {
723        self.add_handler_after_inner(target, NameCachedHandler::new(handler))
724    }
725
726    fn add_handler_before_inner(&mut self, target: &str, handler: NameCachedHandler) -> Result<()> {
727        if let Some(pos) = self.handlers.iter().position(|x| x.name == target) {
728            self.handlers.insert(pos, handler);
729            return Ok(());
730        }
731
732        error::HandlerNotFoundSnafu { name: target }.fail()
733    }
734
735    /// Adds the handler before the specified handler.
736    pub fn add_handler_before(
737        &mut self,
738        target: &'static str,
739        handler: impl HeartbeatHandler + 'static,
740    ) -> Result<()> {
741        self.add_handler_before_inner(target, NameCachedHandler::new(handler))
742    }
743
744    fn replace_handler_inner(&mut self, target: &str, handler: NameCachedHandler) -> Result<()> {
745        if let Some(pos) = self.handlers.iter().position(|x| x.name == target) {
746            self.handlers[pos] = handler;
747            return Ok(());
748        }
749
750        error::HandlerNotFoundSnafu { name: target }.fail()
751    }
752
753    /// Replaces the handler with the specified name.
754    pub fn replace_handler(
755        &mut self,
756        target: &'static str,
757        handler: impl HeartbeatHandler + 'static,
758    ) -> Result<()> {
759        self.replace_handler_inner(target, NameCachedHandler::new(handler))
760    }
761
762    fn add_handler_last_inner(&mut self, handler: NameCachedHandler) {
763        self.handlers.push(handler);
764    }
765
766    fn add_handler_last(&mut self, handler: impl HeartbeatHandler + 'static) {
767        self.add_handler_last_inner(NameCachedHandler::new(handler));
768    }
769}
770
771pub type HeartbeatHandlerGroupBuilderCustomizerRef =
772    Arc<dyn HeartbeatHandlerGroupBuilderCustomizer>;
773
774pub enum CustomizeHeartbeatGroupAction {
775    AddHandlerAfter {
776        target: String,
777        handler: NameCachedHandler,
778    },
779    AddHandlerBefore {
780        target: String,
781        handler: NameCachedHandler,
782    },
783    ReplaceHandler {
784        target: String,
785        handler: NameCachedHandler,
786    },
787    AddHandlerLast {
788        handler: NameCachedHandler,
789    },
790}
791
792impl CustomizeHeartbeatGroupAction {
793    pub fn new_add_handler_after(
794        target: &'static str,
795        handler: impl HeartbeatHandler + 'static,
796    ) -> Self {
797        Self::AddHandlerAfter {
798            target: target.to_string(),
799            handler: NameCachedHandler::new(handler),
800        }
801    }
802
803    pub fn new_add_handler_before(
804        target: &'static str,
805        handler: impl HeartbeatHandler + 'static,
806    ) -> Self {
807        Self::AddHandlerBefore {
808            target: target.to_string(),
809            handler: NameCachedHandler::new(handler),
810        }
811    }
812
813    pub fn new_replace_handler(
814        target: &'static str,
815        handler: impl HeartbeatHandler + 'static,
816    ) -> Self {
817        Self::ReplaceHandler {
818            target: target.to_string(),
819            handler: NameCachedHandler::new(handler),
820        }
821    }
822
823    pub fn new_add_handler_last(handler: impl HeartbeatHandler + 'static) -> Self {
824        Self::AddHandlerLast {
825            handler: NameCachedHandler::new(handler),
826        }
827    }
828}
829
830/// The customizer of the [`HeartbeatHandlerGroupBuilder`].
831pub trait HeartbeatHandlerGroupBuilderCustomizer: Send + Sync {
832    fn customize(&self, builder: &mut HeartbeatHandlerGroupBuilder) -> Result<()>;
833
834    fn add_action(&self, action: CustomizeHeartbeatGroupAction);
835}
836
837#[derive(Default)]
838pub struct DefaultHeartbeatHandlerGroupBuilderCustomizer {
839    actions: Mutex<Vec<CustomizeHeartbeatGroupAction>>,
840}
841
842impl HeartbeatHandlerGroupBuilderCustomizer for DefaultHeartbeatHandlerGroupBuilderCustomizer {
843    fn customize(&self, builder: &mut HeartbeatHandlerGroupBuilder) -> Result<()> {
844        info!("Customizing the heartbeat handler group builder");
845        let mut actions = self.actions.lock().unwrap();
846        for action in actions.drain(..) {
847            match action {
848                CustomizeHeartbeatGroupAction::AddHandlerAfter { target, handler } => {
849                    builder.add_handler_after_inner(&target, handler)?;
850                }
851                CustomizeHeartbeatGroupAction::AddHandlerBefore { target, handler } => {
852                    builder.add_handler_before_inner(&target, handler)?;
853                }
854                CustomizeHeartbeatGroupAction::ReplaceHandler { target, handler } => {
855                    builder.replace_handler_inner(&target, handler)?;
856                }
857                CustomizeHeartbeatGroupAction::AddHandlerLast { handler } => {
858                    builder.add_handler_last_inner(handler);
859                }
860            }
861        }
862        Ok(())
863    }
864
865    fn add_action(&self, action: CustomizeHeartbeatGroupAction) {
866        self.actions.lock().unwrap().push(action);
867    }
868}
869
870#[cfg(test)]
871mod tests {
872
873    use std::assert_matches::assert_matches;
874    use std::sync::Arc;
875    use std::time::Duration;
876
877    use api::v1::meta::{MailboxMessage, Role};
878    use common_meta::kv_backend::memory::MemoryKvBackend;
879    use common_meta::sequence::SequenceBuilder;
880    use tokio::sync::mpsc;
881
882    use super::{HeartbeatHandlerGroupBuilder, PusherId, Pushers};
883    use crate::error;
884    use crate::handler::collect_stats_handler::CollectStatsHandler;
885    use crate::handler::response_header_handler::ResponseHeaderHandler;
886    use crate::handler::{HeartbeatHandlerGroup, HeartbeatMailbox, Pusher};
887    use crate::service::mailbox::{Channel, MailboxReceiver, MailboxRef};
888
889    #[tokio::test]
890    async fn test_mailbox() {
891        let (mailbox, receiver) = push_msg_via_mailbox().await;
892        let id = receiver.message_id();
893
894        let resp_msg = MailboxMessage {
895            id,
896            subject: "resp-test".to_string(),
897            timestamp_millis: 456,
898            ..Default::default()
899        };
900
901        mailbox.on_recv(id, Ok(resp_msg)).await.unwrap();
902
903        let recv_msg = receiver.await.unwrap();
904        assert_eq!(recv_msg.id, id);
905        assert_eq!(recv_msg.timestamp_millis, 456);
906        assert_eq!(recv_msg.subject, "resp-test".to_string());
907    }
908
909    #[tokio::test]
910    async fn test_mailbox_timeout() {
911        let (_, receiver) = push_msg_via_mailbox().await;
912        let res = receiver.await;
913        assert!(res.is_err());
914    }
915
916    async fn push_msg_via_mailbox() -> (MailboxRef, MailboxReceiver) {
917        let datanode_id = 12;
918        let (pusher_tx, mut pusher_rx) = mpsc::channel(16);
919        let pusher_id = PusherId::new(Role::Datanode, datanode_id);
920        let pusher: Pusher = Pusher::new(pusher_tx);
921        let handler_group = HeartbeatHandlerGroup::default();
922        handler_group.register_pusher(pusher_id, pusher).await;
923
924        let kv_backend = Arc::new(MemoryKvBackend::new());
925        let seq = SequenceBuilder::new("test_seq", kv_backend).build();
926        let mailbox = HeartbeatMailbox::create(handler_group.pushers(), seq);
927
928        let msg = MailboxMessage {
929            id: 0,
930            subject: "req-test".to_string(),
931            timestamp_millis: 123,
932            ..Default::default()
933        };
934        let ch = Channel::Datanode(datanode_id);
935
936        let receiver = mailbox
937            .send(&ch, msg, Duration::from_secs(1))
938            .await
939            .unwrap();
940
941        let recv_obj = pusher_rx.recv().await.unwrap().unwrap();
942        let message = recv_obj.mailbox_message.unwrap();
943        assert_eq!(message.timestamp_millis, 123);
944        assert_eq!(message.subject, "req-test".to_string());
945
946        (mailbox, receiver)
947    }
948
949    #[test]
950    fn test_handler_group_builder() {
951        let group = HeartbeatHandlerGroupBuilder::new(Pushers::default())
952            .add_default_handlers()
953            .build()
954            .unwrap();
955
956        let handlers = group.handlers;
957        let names = [
958            "ResponseHeaderHandler",
959            "DatanodeKeepLeaseHandler",
960            "FlownodeKeepLeaseHandler",
961            "CheckLeaderHandler",
962            "OnLeaderStartHandler",
963            "ExtractStatHandler",
964            "CollectDatanodeClusterInfoHandler",
965            "CollectFrontendClusterInfoHandler",
966            "CollectFlownodeClusterInfoHandler",
967            "MailboxHandler",
968            "FilterInactiveRegionStatsHandler",
969            "CollectLeaderRegionHandler",
970            "CollectTopicStatsHandler",
971            "CollectStatsHandler",
972            "RemapFlowPeerHandler",
973        ];
974        assert_eq!(names.len(), handlers.len());
975        for (handler, name) in handlers.iter().zip(names.into_iter()) {
976            assert_eq!(handler.name, name);
977        }
978    }
979
980    #[test]
981    fn test_handler_group_builder_add_before() {
982        let mut builder =
983            HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers();
984        builder
985            .add_handler_before(
986                "FilterInactiveRegionStatsHandler",
987                CollectStatsHandler::default(),
988            )
989            .unwrap();
990
991        let group = builder.build().unwrap();
992        let handlers = group.handlers;
993        let names = [
994            "ResponseHeaderHandler",
995            "DatanodeKeepLeaseHandler",
996            "FlownodeKeepLeaseHandler",
997            "CheckLeaderHandler",
998            "OnLeaderStartHandler",
999            "ExtractStatHandler",
1000            "CollectDatanodeClusterInfoHandler",
1001            "CollectFrontendClusterInfoHandler",
1002            "CollectFlownodeClusterInfoHandler",
1003            "MailboxHandler",
1004            "CollectStatsHandler",
1005            "FilterInactiveRegionStatsHandler",
1006            "CollectLeaderRegionHandler",
1007            "CollectTopicStatsHandler",
1008            "CollectStatsHandler",
1009            "RemapFlowPeerHandler",
1010        ];
1011        assert_eq!(names.len(), handlers.len());
1012        for (handler, name) in handlers.iter().zip(names.into_iter()) {
1013            assert_eq!(handler.name, name);
1014        }
1015    }
1016
1017    #[test]
1018    fn test_handler_group_builder_add_before_first() {
1019        let mut builder =
1020            HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers();
1021        builder
1022            .add_handler_before("ResponseHeaderHandler", CollectStatsHandler::default())
1023            .unwrap();
1024
1025        let group = builder.build().unwrap();
1026        let handlers = group.handlers;
1027        let names = [
1028            "CollectStatsHandler",
1029            "ResponseHeaderHandler",
1030            "DatanodeKeepLeaseHandler",
1031            "FlownodeKeepLeaseHandler",
1032            "CheckLeaderHandler",
1033            "OnLeaderStartHandler",
1034            "ExtractStatHandler",
1035            "CollectDatanodeClusterInfoHandler",
1036            "CollectFrontendClusterInfoHandler",
1037            "CollectFlownodeClusterInfoHandler",
1038            "MailboxHandler",
1039            "FilterInactiveRegionStatsHandler",
1040            "CollectLeaderRegionHandler",
1041            "CollectTopicStatsHandler",
1042            "CollectStatsHandler",
1043            "RemapFlowPeerHandler",
1044        ];
1045        assert_eq!(names.len(), handlers.len());
1046        for (handler, name) in handlers.iter().zip(names.into_iter()) {
1047            assert_eq!(handler.name, name);
1048        }
1049    }
1050
1051    #[test]
1052    fn test_handler_group_builder_add_after() {
1053        let mut builder =
1054            HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers();
1055        builder
1056            .add_handler_after("MailboxHandler", CollectStatsHandler::default())
1057            .unwrap();
1058
1059        let group = builder.build().unwrap();
1060        let handlers = group.handlers;
1061        let names = [
1062            "ResponseHeaderHandler",
1063            "DatanodeKeepLeaseHandler",
1064            "FlownodeKeepLeaseHandler",
1065            "CheckLeaderHandler",
1066            "OnLeaderStartHandler",
1067            "ExtractStatHandler",
1068            "CollectDatanodeClusterInfoHandler",
1069            "CollectFrontendClusterInfoHandler",
1070            "CollectFlownodeClusterInfoHandler",
1071            "MailboxHandler",
1072            "CollectStatsHandler",
1073            "FilterInactiveRegionStatsHandler",
1074            "CollectLeaderRegionHandler",
1075            "CollectTopicStatsHandler",
1076            "CollectStatsHandler",
1077            "RemapFlowPeerHandler",
1078        ];
1079        assert_eq!(names.len(), handlers.len());
1080        for (handler, name) in handlers.iter().zip(names.into_iter()) {
1081            assert_eq!(handler.name, name);
1082        }
1083    }
1084
1085    #[test]
1086    fn test_handler_group_builder_add_after_last() {
1087        let mut builder =
1088            HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers();
1089        builder
1090            .add_handler_after("CollectStatsHandler", ResponseHeaderHandler)
1091            .unwrap();
1092
1093        let group = builder.build().unwrap();
1094        let handlers = group.handlers;
1095        let names = [
1096            "ResponseHeaderHandler",
1097            "DatanodeKeepLeaseHandler",
1098            "FlownodeKeepLeaseHandler",
1099            "CheckLeaderHandler",
1100            "OnLeaderStartHandler",
1101            "ExtractStatHandler",
1102            "CollectDatanodeClusterInfoHandler",
1103            "CollectFrontendClusterInfoHandler",
1104            "CollectFlownodeClusterInfoHandler",
1105            "MailboxHandler",
1106            "FilterInactiveRegionStatsHandler",
1107            "CollectLeaderRegionHandler",
1108            "CollectTopicStatsHandler",
1109            "CollectStatsHandler",
1110            "ResponseHeaderHandler",
1111            "RemapFlowPeerHandler",
1112        ];
1113        assert_eq!(names.len(), handlers.len());
1114        for (handler, name) in handlers.iter().zip(names.into_iter()) {
1115            assert_eq!(handler.name, name);
1116        }
1117    }
1118
1119    #[test]
1120    fn test_handler_group_builder_replace() {
1121        let mut builder =
1122            HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers();
1123        builder
1124            .replace_handler("MailboxHandler", CollectStatsHandler::default())
1125            .unwrap();
1126
1127        let group = builder.build().unwrap();
1128        let handlers = group.handlers;
1129        let names = [
1130            "ResponseHeaderHandler",
1131            "DatanodeKeepLeaseHandler",
1132            "FlownodeKeepLeaseHandler",
1133            "CheckLeaderHandler",
1134            "OnLeaderStartHandler",
1135            "ExtractStatHandler",
1136            "CollectDatanodeClusterInfoHandler",
1137            "CollectFrontendClusterInfoHandler",
1138            "CollectFlownodeClusterInfoHandler",
1139            "CollectStatsHandler",
1140            "FilterInactiveRegionStatsHandler",
1141            "CollectLeaderRegionHandler",
1142            "CollectTopicStatsHandler",
1143            "CollectStatsHandler",
1144            "RemapFlowPeerHandler",
1145        ];
1146
1147        assert_eq!(names.len(), handlers.len());
1148        for (handler, name) in handlers.iter().zip(names.into_iter()) {
1149            assert_eq!(handler.name, name);
1150        }
1151    }
1152
1153    #[test]
1154    fn test_handler_group_builder_replace_last() {
1155        let mut builder =
1156            HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers();
1157        builder
1158            .replace_handler("CollectStatsHandler", ResponseHeaderHandler)
1159            .unwrap();
1160
1161        let group = builder.build().unwrap();
1162        let handlers = group.handlers;
1163        let names = [
1164            "ResponseHeaderHandler",
1165            "DatanodeKeepLeaseHandler",
1166            "FlownodeKeepLeaseHandler",
1167            "CheckLeaderHandler",
1168            "OnLeaderStartHandler",
1169            "ExtractStatHandler",
1170            "CollectDatanodeClusterInfoHandler",
1171            "CollectFrontendClusterInfoHandler",
1172            "CollectFlownodeClusterInfoHandler",
1173            "MailboxHandler",
1174            "FilterInactiveRegionStatsHandler",
1175            "CollectLeaderRegionHandler",
1176            "CollectTopicStatsHandler",
1177            "ResponseHeaderHandler",
1178            "RemapFlowPeerHandler",
1179        ];
1180
1181        assert_eq!(names.len(), handlers.len());
1182        for (handler, name) in handlers.iter().zip(names.into_iter()) {
1183            assert_eq!(handler.name, name);
1184        }
1185    }
1186
1187    #[test]
1188    fn test_handler_group_builder_replace_first() {
1189        let mut builder =
1190            HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers();
1191        builder
1192            .replace_handler("ResponseHeaderHandler", CollectStatsHandler::default())
1193            .unwrap();
1194
1195        let group = builder.build().unwrap();
1196        let handlers = group.handlers;
1197        let names = [
1198            "CollectStatsHandler",
1199            "DatanodeKeepLeaseHandler",
1200            "FlownodeKeepLeaseHandler",
1201            "CheckLeaderHandler",
1202            "OnLeaderStartHandler",
1203            "ExtractStatHandler",
1204            "CollectDatanodeClusterInfoHandler",
1205            "CollectFrontendClusterInfoHandler",
1206            "CollectFlownodeClusterInfoHandler",
1207            "MailboxHandler",
1208            "FilterInactiveRegionStatsHandler",
1209            "CollectLeaderRegionHandler",
1210            "CollectTopicStatsHandler",
1211            "CollectStatsHandler",
1212            "RemapFlowPeerHandler",
1213        ];
1214        assert_eq!(names.len(), handlers.len());
1215        for (handler, name) in handlers.iter().zip(names.into_iter()) {
1216            assert_eq!(handler.name, name);
1217        }
1218    }
1219
1220    #[test]
1221    fn test_handler_group_builder_handler_not_found() {
1222        let mut builder =
1223            HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers();
1224        let err = builder
1225            .add_handler_before("NotExists", CollectStatsHandler::default())
1226            .unwrap_err();
1227        assert_matches!(err, error::Error::HandlerNotFound { .. });
1228
1229        let err = builder
1230            .add_handler_after("NotExists", CollectStatsHandler::default())
1231            .unwrap_err();
1232        assert_matches!(err, error::Error::HandlerNotFound { .. });
1233
1234        let err = builder
1235            .replace_handler("NotExists", CollectStatsHandler::default())
1236            .unwrap_err();
1237        assert_matches!(err, error::Error::HandlerNotFound { .. });
1238    }
1239
1240    #[tokio::test]
1241    async fn test_pusher_drop() {
1242        let (tx, _rx) = mpsc::channel(1);
1243        let pusher = Pusher::new(tx);
1244        let mut deregister_signal_tx = pusher.deregister_signal_receiver.clone();
1245
1246        drop(pusher);
1247        deregister_signal_tx.changed().await.unwrap();
1248    }
1249}