meta_srv/
handler.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashSet};
16use std::fmt::{Debug, Display};
17use std::ops::Range;
18use std::sync::{Arc, Mutex};
19use std::time::{Duration, Instant};
20
21use api::v1::meta::mailbox_message::Payload;
22use api::v1::meta::{
23    HeartbeatRequest, HeartbeatResponse, MailboxMessage, PROTOCOL_VERSION, RegionLease,
24    ResponseHeader, Role,
25};
26use check_leader_handler::CheckLeaderHandler;
27use collect_cluster_info_handler::{
28    CollectDatanodeClusterInfoHandler, CollectFlownodeClusterInfoHandler,
29    CollectFrontendClusterInfoHandler,
30};
31use collect_leader_region_handler::CollectLeaderRegionHandler;
32use collect_stats_handler::CollectStatsHandler;
33use common_base::Plugins;
34use common_meta::datanode::Stat;
35use common_meta::instruction::{Instruction, InstructionReply};
36use common_meta::sequence::Sequence;
37use common_telemetry::{debug, info, warn};
38use dashmap::DashMap;
39use extract_stat_handler::ExtractStatHandler;
40use failure_handler::RegionFailureHandler;
41use filter_inactive_region_stats::FilterInactiveRegionStatsHandler;
42use futures::future::join_all;
43use keep_lease_handler::{DatanodeKeepLeaseHandler, FlownodeKeepLeaseHandler};
44use mailbox_handler::MailboxHandler;
45use on_leader_start_handler::OnLeaderStartHandler;
46use publish_heartbeat_handler::PublishHeartbeatHandler;
47use region_lease_handler::RegionLeaseHandler;
48use remap_flow_peer_handler::RemapFlowPeerHandler;
49use response_header_handler::ResponseHeaderHandler;
50use snafu::{OptionExt, ResultExt};
51use store_api::storage::RegionId;
52use tokio::sync::mpsc::Sender;
53use tokio::sync::{Notify, RwLock, oneshot, watch};
54
55use crate::error::{self, DeserializeFromJsonSnafu, Result, UnexpectedInstructionReplySnafu};
56use crate::handler::collect_topic_stats_handler::CollectTopicStatsHandler;
57use crate::handler::flow_state_handler::FlowStateHandler;
58use crate::handler::persist_stats_handler::PersistStatsHandler;
59use crate::metasrv::Context;
60use crate::metrics::{METRIC_META_HANDLER_EXECUTE, METRIC_META_HEARTBEAT_CONNECTION_NUM};
61use crate::pubsub::PublisherRef;
62use crate::service::mailbox::{
63    BroadcastChannel, Channel, Mailbox, MailboxReceiver, MailboxRef, MessageId,
64};
65
66pub mod check_leader_handler;
67pub mod collect_cluster_info_handler;
68pub mod collect_leader_region_handler;
69pub mod collect_stats_handler;
70pub mod collect_topic_stats_handler;
71pub mod extract_stat_handler;
72pub mod failure_handler;
73pub mod filter_inactive_region_stats;
74pub mod flow_state_handler;
75pub mod keep_lease_handler;
76pub mod mailbox_handler;
77pub mod on_leader_start_handler;
78pub mod persist_stats_handler;
79pub mod publish_heartbeat_handler;
80pub mod region_lease_handler;
81pub mod remap_flow_peer_handler;
82pub mod response_header_handler;
83
84#[cfg(test)]
85pub mod test_utils;
86
87#[async_trait::async_trait]
88pub trait HeartbeatHandler: Send + Sync {
89    fn is_acceptable(&self, role: Role) -> bool;
90
91    fn name(&self) -> &'static str {
92        let type_name = std::any::type_name::<Self>();
93        // short name
94        type_name.split("::").last().unwrap_or(type_name)
95    }
96
97    async fn handle(
98        &self,
99        req: &HeartbeatRequest,
100        ctx: &mut Context,
101        acc: &mut HeartbeatAccumulator,
102    ) -> Result<HandleControl>;
103}
104
105/// HandleControl
106///
107/// Controls process of handling heartbeat request.
108#[derive(PartialEq, Debug)]
109pub enum HandleControl {
110    Continue,
111    Done,
112}
113
114#[derive(Debug, Default)]
115pub struct HeartbeatAccumulator {
116    pub header: Option<ResponseHeader>,
117    pub instructions: Vec<Instruction>,
118    pub stat: Option<Stat>,
119    pub inactive_region_ids: HashSet<RegionId>,
120    pub region_lease: Option<RegionLease>,
121}
122
123impl HeartbeatAccumulator {
124    pub fn into_mailbox_message(self) -> Option<MailboxMessage> {
125        // TODO(jiachun): to HeartbeatResponse payload
126        None
127    }
128}
129
130#[derive(Copy, Clone)]
131pub struct PusherId {
132    pub role: Role,
133    pub id: u64,
134}
135
136impl Debug for PusherId {
137    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
138        write!(f, "{:?}-{}", self.role, self.id)
139    }
140}
141
142impl Display for PusherId {
143    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
144        write!(f, "{:?}-{}", self.role, self.id)
145    }
146}
147
148impl PusherId {
149    pub fn new(role: Role, id: u64) -> Self {
150        Self { role, id }
151    }
152
153    pub fn string_key(&self) -> String {
154        format!("{}-{}", self.role as i32, self.id)
155    }
156}
157
158/// The receiver of the deregister signal.
159pub type DeregisterSignalReceiver = watch::Receiver<bool>;
160
161/// The pusher of the heartbeat response.
162pub struct Pusher {
163    sender: Sender<std::result::Result<HeartbeatResponse, tonic::Status>>,
164    // The sender of the deregister signal.
165    // default is false, means the pusher is not deregistered.
166    // when the pusher is deregistered, the sender will be notified.
167    deregister_signal_sender: watch::Sender<bool>,
168    deregister_signal_receiver: DeregisterSignalReceiver,
169
170    res_header: ResponseHeader,
171}
172
173impl Drop for Pusher {
174    fn drop(&mut self) {
175        // Ignore the error here.
176        // if all the receivers have been dropped, means no body cares the deregister signal.
177        let _ = self.deregister_signal_sender.send(true);
178    }
179}
180
181impl Pusher {
182    pub fn new(sender: Sender<std::result::Result<HeartbeatResponse, tonic::Status>>) -> Self {
183        let res_header = ResponseHeader {
184            protocol_version: PROTOCOL_VERSION,
185            ..Default::default()
186        };
187        let (deregister_signal_sender, deregister_signal_receiver) = watch::channel(false);
188        Self {
189            sender,
190            deregister_signal_sender,
191            deregister_signal_receiver,
192            res_header,
193        }
194    }
195
196    #[inline]
197    pub async fn push(&self, res: HeartbeatResponse) -> Result<()> {
198        self.sender.send(Ok(res)).await.map_err(|e| {
199            error::PushMessageSnafu {
200                err_msg: e.to_string(),
201            }
202            .build()
203        })
204    }
205
206    #[inline]
207    pub fn header(&self) -> ResponseHeader {
208        self.res_header.clone()
209    }
210}
211
212/// The group of heartbeat pushers.
213#[derive(Clone, Default)]
214pub struct Pushers(Arc<RwLock<BTreeMap<String, Pusher>>>);
215
216impl Pushers {
217    async fn push(
218        &self,
219        pusher_id: PusherId,
220        mailbox_message: MailboxMessage,
221    ) -> Result<DeregisterSignalReceiver> {
222        let pusher_id = pusher_id.string_key();
223        let pushers = self.0.read().await;
224        let pusher = pushers
225            .get(&pusher_id)
226            .context(error::PusherNotFoundSnafu { pusher_id })?;
227
228        pusher
229            .push(HeartbeatResponse {
230                header: Some(pusher.header()),
231                mailbox_message: Some(mailbox_message),
232                ..Default::default()
233            })
234            .await?;
235
236        Ok(pusher.deregister_signal_receiver.clone())
237    }
238
239    async fn broadcast(
240        &self,
241        range: Range<String>,
242        mailbox_message: &MailboxMessage,
243    ) -> Result<()> {
244        let pushers = self.0.read().await;
245        let pushers = pushers
246            .range(range)
247            .map(|(_, value)| value)
248            .collect::<Vec<_>>();
249        let mut results = Vec::with_capacity(pushers.len());
250
251        for pusher in pushers {
252            let mut mailbox_message = mailbox_message.clone();
253            mailbox_message.id = 0; // one-way message
254
255            results.push(pusher.push(HeartbeatResponse {
256                header: Some(pusher.header()),
257                mailbox_message: Some(mailbox_message),
258                ..Default::default()
259            }))
260        }
261
262        // Checks the error out of the loop.
263        let _ = join_all(results)
264            .await
265            .into_iter()
266            .collect::<Result<Vec<_>>>()?;
267
268        Ok(())
269    }
270
271    pub(crate) async fn insert(&self, pusher_id: String, pusher: Pusher) -> Option<Pusher> {
272        self.0.write().await.insert(pusher_id, pusher)
273    }
274
275    async fn remove(&self, pusher_id: &str) -> Option<Pusher> {
276        self.0.write().await.remove(pusher_id)
277    }
278}
279
280#[derive(Clone)]
281pub struct NameCachedHandler {
282    name: &'static str,
283    handler: Arc<dyn HeartbeatHandler>,
284}
285
286impl NameCachedHandler {
287    fn new(handler: impl HeartbeatHandler + 'static) -> Self {
288        let name = handler.name();
289        let handler = Arc::new(handler);
290        Self { name, handler }
291    }
292}
293
294pub type HeartbeatHandlerGroupRef = Arc<HeartbeatHandlerGroup>;
295
296/// The group of heartbeat handlers.
297#[derive(Default, Clone)]
298pub struct HeartbeatHandlerGroup {
299    handlers: Vec<NameCachedHandler>,
300    pushers: Pushers,
301}
302
303impl HeartbeatHandlerGroup {
304    /// Registers the heartbeat response [`Pusher`] with the given key to the group.
305    pub async fn register_pusher(&self, pusher_id: PusherId, pusher: Pusher) {
306        METRIC_META_HEARTBEAT_CONNECTION_NUM.inc();
307        info!("Pusher register: {}", pusher_id);
308        let _ = self.pushers.insert(pusher_id.string_key(), pusher).await;
309    }
310
311    /// Deregisters the heartbeat response [`Pusher`] with the given key from the group.
312    ///
313    /// Returns the [`Pusher`] if it exists.
314    pub async fn deregister_push(&self, pusher_id: PusherId) -> Option<Pusher> {
315        METRIC_META_HEARTBEAT_CONNECTION_NUM.dec();
316        info!("Pusher unregister: {}", pusher_id);
317        self.pushers.remove(&pusher_id.string_key()).await
318    }
319
320    /// Returns the [`Pushers`] of the group.
321    pub fn pushers(&self) -> Pushers {
322        self.pushers.clone()
323    }
324
325    /// Handles the heartbeat request.
326    pub async fn handle(
327        &self,
328        req: HeartbeatRequest,
329        mut ctx: Context,
330    ) -> Result<HeartbeatResponse> {
331        let mut acc = HeartbeatAccumulator::default();
332        let role = req
333            .header
334            .as_ref()
335            .and_then(|h| Role::try_from(h.role).ok())
336            .context(error::InvalidArgumentsSnafu {
337                err_msg: format!("invalid role: {:?}", req.header),
338            })?;
339
340        for NameCachedHandler { name, handler } in self.handlers.iter() {
341            if !handler.is_acceptable(role) {
342                continue;
343            }
344
345            let _timer = METRIC_META_HANDLER_EXECUTE
346                .with_label_values(&[*name])
347                .start_timer();
348
349            if handler.handle(&req, &mut ctx, &mut acc).await? == HandleControl::Done {
350                break;
351            }
352        }
353        let header = std::mem::take(&mut acc.header);
354        let res = HeartbeatResponse {
355            header,
356            region_lease: acc.region_lease,
357            ..Default::default()
358        };
359        Ok(res)
360    }
361}
362
363pub struct HeartbeatMailbox {
364    pushers: Pushers,
365    sequence: Sequence,
366    senders: DashMap<MessageId, oneshot::Sender<Result<MailboxMessage>>>,
367    timeouts: DashMap<MessageId, Instant>,
368    timeout_notify: Notify,
369}
370
371impl HeartbeatMailbox {
372    pub fn json_reply(msg: &MailboxMessage) -> Result<InstructionReply> {
373        let Payload::Json(payload) =
374            msg.payload
375                .as_ref()
376                .with_context(|| UnexpectedInstructionReplySnafu {
377                    mailbox_message: msg.to_string(),
378                    reason: format!("empty payload, msg: {msg:?}"),
379                })?;
380        serde_json::from_str(payload).context(DeserializeFromJsonSnafu { input: payload })
381    }
382
383    /// Parses the [Instruction] from [MailboxMessage].
384    #[cfg(test)]
385    pub fn json_instruction(msg: &MailboxMessage) -> Result<Instruction> {
386        let Payload::Json(payload) =
387            msg.payload
388                .as_ref()
389                .with_context(|| UnexpectedInstructionReplySnafu {
390                    mailbox_message: msg.to_string(),
391                    reason: format!("empty payload, msg: {msg:?}"),
392                })?;
393        serde_json::from_str(payload).context(DeserializeFromJsonSnafu { input: payload })
394    }
395
396    pub fn create(pushers: Pushers, sequence: Sequence) -> MailboxRef {
397        let mailbox = Arc::new(Self::new(pushers, sequence));
398
399        let timeout_checker = mailbox.clone();
400        let _handle = common_runtime::spawn_global(async move {
401            timeout_checker.check_timeout_bg(10).await;
402        });
403
404        mailbox
405    }
406
407    fn new(pushers: Pushers, sequence: Sequence) -> Self {
408        Self {
409            pushers,
410            sequence,
411            senders: DashMap::default(),
412            timeouts: DashMap::default(),
413            timeout_notify: Notify::new(),
414        }
415    }
416
417    async fn check_timeout_bg(&self, interval_millis: u64) {
418        let mut interval = tokio::time::interval(Duration::from_millis(interval_millis));
419
420        loop {
421            let _ = interval.tick().await;
422
423            if self.timeouts.is_empty() {
424                self.timeout_notify.notified().await;
425            }
426
427            let now = Instant::now();
428            let timeout_ids = self
429                .timeouts
430                .iter()
431                .filter_map(|entry| {
432                    let (id, deadline) = entry.pair();
433                    if deadline < &now { Some(*id) } else { None }
434                })
435                .collect::<Vec<_>>();
436
437            for id in timeout_ids {
438                let _ = self
439                    .on_recv(id, Err(error::MailboxTimeoutSnafu { id }.build()))
440                    .await;
441            }
442        }
443    }
444
445    #[inline]
446    async fn next_message_id(&self) -> Result<u64> {
447        // In this implementation, we pre-occupy the message_id of 0,
448        // and we use `message_id = 0` to mark a Message as a one-way call.
449        loop {
450            let next = self
451                .sequence
452                .next()
453                .await
454                .context(error::NextSequenceSnafu)?;
455            if next > 0 {
456                return Ok(next);
457            }
458        }
459    }
460}
461
462#[async_trait::async_trait]
463impl Mailbox for HeartbeatMailbox {
464    async fn send(
465        &self,
466        ch: &Channel,
467        mut msg: MailboxMessage,
468        timeout: Duration,
469    ) -> Result<MailboxReceiver> {
470        let message_id = self.next_message_id().await?;
471        msg.id = message_id;
472
473        let pusher_id = ch.pusher_id();
474        debug!("Sending mailbox message {msg:?} to {pusher_id}");
475
476        let (tx, rx) = oneshot::channel();
477        let _ = self.senders.insert(message_id, tx);
478        let deadline = Instant::now() + timeout;
479        self.timeouts.insert(message_id, deadline);
480        self.timeout_notify.notify_one();
481
482        let deregister_signal_receiver = self.pushers.push(pusher_id, msg).await?;
483
484        Ok(MailboxReceiver::new(
485            message_id,
486            rx,
487            deregister_signal_receiver,
488            *ch,
489        ))
490    }
491
492    async fn send_oneway(&self, ch: &Channel, mut msg: MailboxMessage) -> Result<()> {
493        let message_id = 0; // one-way message, same as `broadcast`
494        msg.id = message_id;
495
496        let pusher_id = ch.pusher_id();
497        debug!("Sending mailbox message {msg:?} to {pusher_id}");
498
499        self.pushers.push(pusher_id, msg).await?;
500
501        Ok(())
502    }
503
504    async fn broadcast(&self, ch: &BroadcastChannel, msg: &MailboxMessage) -> Result<()> {
505        self.pushers.broadcast(ch.pusher_range(), msg).await
506    }
507
508    async fn on_recv(&self, id: MessageId, maybe_msg: Result<MailboxMessage>) -> Result<()> {
509        debug!("Received mailbox message {maybe_msg:?}");
510
511        let _ = self.timeouts.remove(&id);
512
513        if let Some((_, tx)) = self.senders.remove(&id) {
514            tx.send(maybe_msg)
515                .map_err(|_| error::MailboxClosedSnafu { id }.build())?;
516        } else if let Ok(finally_msg) = maybe_msg {
517            warn!("The response arrived too late: {finally_msg:?}");
518        }
519
520        Ok(())
521    }
522}
523
524/// The builder to build the group of heartbeat handlers.
525pub struct HeartbeatHandlerGroupBuilder {
526    /// The handler to handle region failure.
527    region_failure_handler: Option<RegionFailureHandler>,
528
529    /// The handler to handle region lease.
530    region_lease_handler: Option<RegionLeaseHandler>,
531
532    /// The factor that determines how often statistics should be flushed,
533    /// based on the number of received heartbeats. When the number of heartbeats
534    /// reaches this factor, a flush operation is triggered.
535    flush_stats_factor: Option<usize>,
536    /// A simple handler for flow internal state report
537    flow_state_handler: Option<FlowStateHandler>,
538
539    /// The handler to persist stats.
540    persist_stats_handler: Option<PersistStatsHandler>,
541
542    /// The plugins.
543    plugins: Option<Plugins>,
544
545    /// The heartbeat response pushers.
546    pushers: Pushers,
547
548    /// The group of heartbeat handlers.
549    handlers: Vec<NameCachedHandler>,
550}
551
552impl HeartbeatHandlerGroupBuilder {
553    pub fn new(pushers: Pushers) -> Self {
554        Self {
555            region_failure_handler: None,
556            region_lease_handler: None,
557            flush_stats_factor: None,
558            flow_state_handler: None,
559            persist_stats_handler: None,
560            plugins: None,
561            pushers,
562            handlers: vec![],
563        }
564    }
565
566    pub fn with_flow_state_handler(mut self, handler: Option<FlowStateHandler>) -> Self {
567        self.flow_state_handler = handler;
568        self
569    }
570
571    pub fn with_region_lease_handler(mut self, handler: Option<RegionLeaseHandler>) -> Self {
572        self.region_lease_handler = handler;
573        self
574    }
575
576    /// Sets the [`RegionFailureHandler`].
577    pub fn with_region_failure_handler(mut self, handler: Option<RegionFailureHandler>) -> Self {
578        self.region_failure_handler = handler;
579        self
580    }
581
582    /// Sets the flush stats factor.
583    pub fn with_flush_stats_factor(mut self, flush_stats_factor: Option<usize>) -> Self {
584        self.flush_stats_factor = flush_stats_factor;
585        self
586    }
587
588    pub fn with_persist_stats_handler(mut self, handler: Option<PersistStatsHandler>) -> Self {
589        self.persist_stats_handler = handler;
590        self
591    }
592
593    /// Sets the [`Plugins`].
594    pub fn with_plugins(mut self, plugins: Option<Plugins>) -> Self {
595        self.plugins = plugins;
596        self
597    }
598
599    /// Adds the default handlers.
600    pub fn add_default_handlers(mut self) -> Self {
601        // Extract the `PublishHeartbeatHandler` from the plugins.
602        let publish_heartbeat_handler = if let Some(plugins) = self.plugins.as_ref() {
603            plugins
604                .get::<PublisherRef>()
605                .map(|publish| PublishHeartbeatHandler::new(publish.clone()))
606        } else {
607            None
608        };
609
610        self.add_handler_last(ResponseHeaderHandler);
611        // `KeepLeaseHandler` should preferably be in front of `CheckLeaderHandler`,
612        // because even if the current meta-server node is no longer the leader it can
613        // still help the datanode to keep lease.
614        self.add_handler_last(DatanodeKeepLeaseHandler);
615        self.add_handler_last(FlownodeKeepLeaseHandler);
616        self.add_handler_last(CheckLeaderHandler);
617        self.add_handler_last(OnLeaderStartHandler);
618        self.add_handler_last(ExtractStatHandler);
619        self.add_handler_last(CollectDatanodeClusterInfoHandler);
620        self.add_handler_last(CollectFrontendClusterInfoHandler);
621        self.add_handler_last(CollectFlownodeClusterInfoHandler);
622        self.add_handler_last(MailboxHandler);
623        if let Some(region_lease_handler) = self.region_lease_handler.take() {
624            self.add_handler_last(region_lease_handler);
625        }
626        self.add_handler_last(FilterInactiveRegionStatsHandler);
627        if let Some(region_failure_handler) = self.region_failure_handler.take() {
628            self.add_handler_last(region_failure_handler);
629        }
630        if let Some(publish_heartbeat_handler) = publish_heartbeat_handler {
631            self.add_handler_last(publish_heartbeat_handler);
632        }
633        self.add_handler_last(CollectLeaderRegionHandler);
634        self.add_handler_last(CollectTopicStatsHandler);
635        // Persist stats handler should be in front of collect stats handler.
636        // Because collect stats handler will consume the stats from the accumulator.
637        if let Some(persist_stats_handler) = self.persist_stats_handler.take() {
638            self.add_handler_last(persist_stats_handler);
639        }
640        self.add_handler_last(CollectStatsHandler::new(self.flush_stats_factor));
641        self.add_handler_last(RemapFlowPeerHandler::default());
642
643        if let Some(flow_state_handler) = self.flow_state_handler.take() {
644            self.add_handler_last(flow_state_handler);
645        }
646
647        self
648    }
649
650    /// Builds the group of heartbeat handlers.
651    ///
652    /// Applies the customizer if it exists.
653    pub fn build(mut self) -> Result<HeartbeatHandlerGroup> {
654        if let Some(customizer) = self
655            .plugins
656            .as_ref()
657            .and_then(|plugins| plugins.get::<HeartbeatHandlerGroupBuilderCustomizerRef>())
658        {
659            debug!("Customizing the heartbeat handler group builder");
660            customizer.customize(&mut self)?;
661        }
662
663        Ok(HeartbeatHandlerGroup {
664            handlers: self.handlers,
665            pushers: self.pushers,
666        })
667    }
668
669    fn add_handler_after_inner(&mut self, target: &str, handler: NameCachedHandler) -> Result<()> {
670        if let Some(pos) = self.handlers.iter().position(|x| x.name == target) {
671            self.handlers.insert(pos + 1, handler);
672            return Ok(());
673        }
674
675        error::HandlerNotFoundSnafu { name: target }.fail()
676    }
677
678    /// Adds the handler after the specified handler.
679    pub fn add_handler_after(
680        &mut self,
681        target: &'static str,
682        handler: impl HeartbeatHandler + 'static,
683    ) -> Result<()> {
684        self.add_handler_after_inner(target, NameCachedHandler::new(handler))
685    }
686
687    fn add_handler_before_inner(&mut self, target: &str, handler: NameCachedHandler) -> Result<()> {
688        if let Some(pos) = self.handlers.iter().position(|x| x.name == target) {
689            self.handlers.insert(pos, handler);
690            return Ok(());
691        }
692
693        error::HandlerNotFoundSnafu { name: target }.fail()
694    }
695
696    /// Adds the handler before the specified handler.
697    pub fn add_handler_before(
698        &mut self,
699        target: &'static str,
700        handler: impl HeartbeatHandler + 'static,
701    ) -> Result<()> {
702        self.add_handler_before_inner(target, NameCachedHandler::new(handler))
703    }
704
705    fn replace_handler_inner(&mut self, target: &str, handler: NameCachedHandler) -> Result<()> {
706        if let Some(pos) = self.handlers.iter().position(|x| x.name == target) {
707            self.handlers[pos] = handler;
708            return Ok(());
709        }
710
711        error::HandlerNotFoundSnafu { name: target }.fail()
712    }
713
714    /// Replaces the handler with the specified name.
715    pub fn replace_handler(
716        &mut self,
717        target: &'static str,
718        handler: impl HeartbeatHandler + 'static,
719    ) -> Result<()> {
720        self.replace_handler_inner(target, NameCachedHandler::new(handler))
721    }
722
723    fn add_handler_last_inner(&mut self, handler: NameCachedHandler) {
724        self.handlers.push(handler);
725    }
726
727    fn add_handler_last(&mut self, handler: impl HeartbeatHandler + 'static) {
728        self.add_handler_last_inner(NameCachedHandler::new(handler));
729    }
730}
731
732pub type HeartbeatHandlerGroupBuilderCustomizerRef =
733    Arc<dyn HeartbeatHandlerGroupBuilderCustomizer>;
734
735pub enum CustomizeHeartbeatGroupAction {
736    AddHandlerAfter {
737        target: String,
738        handler: NameCachedHandler,
739    },
740    AddHandlerBefore {
741        target: String,
742        handler: NameCachedHandler,
743    },
744    ReplaceHandler {
745        target: String,
746        handler: NameCachedHandler,
747    },
748    AddHandlerLast {
749        handler: NameCachedHandler,
750    },
751}
752
753impl CustomizeHeartbeatGroupAction {
754    pub fn new_add_handler_after(
755        target: &'static str,
756        handler: impl HeartbeatHandler + 'static,
757    ) -> Self {
758        Self::AddHandlerAfter {
759            target: target.to_string(),
760            handler: NameCachedHandler::new(handler),
761        }
762    }
763
764    pub fn new_add_handler_before(
765        target: &'static str,
766        handler: impl HeartbeatHandler + 'static,
767    ) -> Self {
768        Self::AddHandlerBefore {
769            target: target.to_string(),
770            handler: NameCachedHandler::new(handler),
771        }
772    }
773
774    pub fn new_replace_handler(
775        target: &'static str,
776        handler: impl HeartbeatHandler + 'static,
777    ) -> Self {
778        Self::ReplaceHandler {
779            target: target.to_string(),
780            handler: NameCachedHandler::new(handler),
781        }
782    }
783
784    pub fn new_add_handler_last(handler: impl HeartbeatHandler + 'static) -> Self {
785        Self::AddHandlerLast {
786            handler: NameCachedHandler::new(handler),
787        }
788    }
789}
790
791/// The customizer of the [`HeartbeatHandlerGroupBuilder`].
792pub trait HeartbeatHandlerGroupBuilderCustomizer: Send + Sync {
793    fn customize(&self, builder: &mut HeartbeatHandlerGroupBuilder) -> Result<()>;
794
795    fn add_action(&self, action: CustomizeHeartbeatGroupAction);
796}
797
798#[derive(Default)]
799pub struct DefaultHeartbeatHandlerGroupBuilderCustomizer {
800    actions: Mutex<Vec<CustomizeHeartbeatGroupAction>>,
801}
802
803impl HeartbeatHandlerGroupBuilderCustomizer for DefaultHeartbeatHandlerGroupBuilderCustomizer {
804    fn customize(&self, builder: &mut HeartbeatHandlerGroupBuilder) -> Result<()> {
805        info!("Customizing the heartbeat handler group builder");
806        let mut actions = self.actions.lock().unwrap();
807        for action in actions.drain(..) {
808            match action {
809                CustomizeHeartbeatGroupAction::AddHandlerAfter { target, handler } => {
810                    builder.add_handler_after_inner(&target, handler)?;
811                }
812                CustomizeHeartbeatGroupAction::AddHandlerBefore { target, handler } => {
813                    builder.add_handler_before_inner(&target, handler)?;
814                }
815                CustomizeHeartbeatGroupAction::ReplaceHandler { target, handler } => {
816                    builder.replace_handler_inner(&target, handler)?;
817                }
818                CustomizeHeartbeatGroupAction::AddHandlerLast { handler } => {
819                    builder.add_handler_last_inner(handler);
820                }
821            }
822        }
823        Ok(())
824    }
825
826    fn add_action(&self, action: CustomizeHeartbeatGroupAction) {
827        self.actions.lock().unwrap().push(action);
828    }
829}
830
831#[cfg(test)]
832mod tests {
833
834    use std::assert_matches::assert_matches;
835    use std::sync::Arc;
836    use std::time::Duration;
837
838    use api::v1::meta::{MailboxMessage, Role};
839    use common_meta::kv_backend::memory::MemoryKvBackend;
840    use common_meta::sequence::SequenceBuilder;
841    use tokio::sync::mpsc;
842
843    use super::{HeartbeatHandlerGroupBuilder, PusherId, Pushers};
844    use crate::error;
845    use crate::handler::collect_stats_handler::CollectStatsHandler;
846    use crate::handler::response_header_handler::ResponseHeaderHandler;
847    use crate::handler::{HeartbeatHandlerGroup, HeartbeatMailbox, Pusher};
848    use crate::service::mailbox::{Channel, MailboxReceiver, MailboxRef};
849
850    #[tokio::test]
851    async fn test_mailbox() {
852        let (mailbox, receiver) = push_msg_via_mailbox().await;
853        let id = receiver.message_id();
854
855        let resp_msg = MailboxMessage {
856            id,
857            subject: "resp-test".to_string(),
858            timestamp_millis: 456,
859            ..Default::default()
860        };
861
862        mailbox.on_recv(id, Ok(resp_msg)).await.unwrap();
863
864        let recv_msg = receiver.await.unwrap();
865        assert_eq!(recv_msg.id, id);
866        assert_eq!(recv_msg.timestamp_millis, 456);
867        assert_eq!(recv_msg.subject, "resp-test".to_string());
868    }
869
870    #[tokio::test]
871    async fn test_mailbox_timeout() {
872        let (_, receiver) = push_msg_via_mailbox().await;
873        let res = receiver.await;
874        assert!(res.is_err());
875    }
876
877    async fn push_msg_via_mailbox() -> (MailboxRef, MailboxReceiver) {
878        let datanode_id = 12;
879        let (pusher_tx, mut pusher_rx) = mpsc::channel(16);
880        let pusher_id = PusherId::new(Role::Datanode, datanode_id);
881        let pusher: Pusher = Pusher::new(pusher_tx);
882        let handler_group = HeartbeatHandlerGroup::default();
883        handler_group.register_pusher(pusher_id, pusher).await;
884
885        let kv_backend = Arc::new(MemoryKvBackend::new());
886        let seq = SequenceBuilder::new("test_seq", kv_backend).build();
887        let mailbox = HeartbeatMailbox::create(handler_group.pushers(), seq);
888
889        let msg = MailboxMessage {
890            id: 0,
891            subject: "req-test".to_string(),
892            timestamp_millis: 123,
893            ..Default::default()
894        };
895        let ch = Channel::Datanode(datanode_id);
896
897        let receiver = mailbox
898            .send(&ch, msg, Duration::from_secs(1))
899            .await
900            .unwrap();
901
902        let recv_obj = pusher_rx.recv().await.unwrap().unwrap();
903        let message = recv_obj.mailbox_message.unwrap();
904        assert_eq!(message.timestamp_millis, 123);
905        assert_eq!(message.subject, "req-test".to_string());
906
907        (mailbox, receiver)
908    }
909
910    #[test]
911    fn test_handler_group_builder() {
912        let group = HeartbeatHandlerGroupBuilder::new(Pushers::default())
913            .add_default_handlers()
914            .build()
915            .unwrap();
916
917        let handlers = group.handlers;
918        let names = [
919            "ResponseHeaderHandler",
920            "DatanodeKeepLeaseHandler",
921            "FlownodeKeepLeaseHandler",
922            "CheckLeaderHandler",
923            "OnLeaderStartHandler",
924            "ExtractStatHandler",
925            "CollectDatanodeClusterInfoHandler",
926            "CollectFrontendClusterInfoHandler",
927            "CollectFlownodeClusterInfoHandler",
928            "MailboxHandler",
929            "FilterInactiveRegionStatsHandler",
930            "CollectLeaderRegionHandler",
931            "CollectTopicStatsHandler",
932            "CollectStatsHandler",
933            "RemapFlowPeerHandler",
934        ];
935        assert_eq!(names.len(), handlers.len());
936        for (handler, name) in handlers.iter().zip(names.into_iter()) {
937            assert_eq!(handler.name, name);
938        }
939    }
940
941    #[test]
942    fn test_handler_group_builder_add_before() {
943        let mut builder =
944            HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers();
945        builder
946            .add_handler_before(
947                "FilterInactiveRegionStatsHandler",
948                CollectStatsHandler::default(),
949            )
950            .unwrap();
951
952        let group = builder.build().unwrap();
953        let handlers = group.handlers;
954        let names = [
955            "ResponseHeaderHandler",
956            "DatanodeKeepLeaseHandler",
957            "FlownodeKeepLeaseHandler",
958            "CheckLeaderHandler",
959            "OnLeaderStartHandler",
960            "ExtractStatHandler",
961            "CollectDatanodeClusterInfoHandler",
962            "CollectFrontendClusterInfoHandler",
963            "CollectFlownodeClusterInfoHandler",
964            "MailboxHandler",
965            "CollectStatsHandler",
966            "FilterInactiveRegionStatsHandler",
967            "CollectLeaderRegionHandler",
968            "CollectTopicStatsHandler",
969            "CollectStatsHandler",
970            "RemapFlowPeerHandler",
971        ];
972        assert_eq!(names.len(), handlers.len());
973        for (handler, name) in handlers.iter().zip(names.into_iter()) {
974            assert_eq!(handler.name, name);
975        }
976    }
977
978    #[test]
979    fn test_handler_group_builder_add_before_first() {
980        let mut builder =
981            HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers();
982        builder
983            .add_handler_before("ResponseHeaderHandler", CollectStatsHandler::default())
984            .unwrap();
985
986        let group = builder.build().unwrap();
987        let handlers = group.handlers;
988        let names = [
989            "CollectStatsHandler",
990            "ResponseHeaderHandler",
991            "DatanodeKeepLeaseHandler",
992            "FlownodeKeepLeaseHandler",
993            "CheckLeaderHandler",
994            "OnLeaderStartHandler",
995            "ExtractStatHandler",
996            "CollectDatanodeClusterInfoHandler",
997            "CollectFrontendClusterInfoHandler",
998            "CollectFlownodeClusterInfoHandler",
999            "MailboxHandler",
1000            "FilterInactiveRegionStatsHandler",
1001            "CollectLeaderRegionHandler",
1002            "CollectTopicStatsHandler",
1003            "CollectStatsHandler",
1004            "RemapFlowPeerHandler",
1005        ];
1006        assert_eq!(names.len(), handlers.len());
1007        for (handler, name) in handlers.iter().zip(names.into_iter()) {
1008            assert_eq!(handler.name, name);
1009        }
1010    }
1011
1012    #[test]
1013    fn test_handler_group_builder_add_after() {
1014        let mut builder =
1015            HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers();
1016        builder
1017            .add_handler_after("MailboxHandler", CollectStatsHandler::default())
1018            .unwrap();
1019
1020        let group = builder.build().unwrap();
1021        let handlers = group.handlers;
1022        let names = [
1023            "ResponseHeaderHandler",
1024            "DatanodeKeepLeaseHandler",
1025            "FlownodeKeepLeaseHandler",
1026            "CheckLeaderHandler",
1027            "OnLeaderStartHandler",
1028            "ExtractStatHandler",
1029            "CollectDatanodeClusterInfoHandler",
1030            "CollectFrontendClusterInfoHandler",
1031            "CollectFlownodeClusterInfoHandler",
1032            "MailboxHandler",
1033            "CollectStatsHandler",
1034            "FilterInactiveRegionStatsHandler",
1035            "CollectLeaderRegionHandler",
1036            "CollectTopicStatsHandler",
1037            "CollectStatsHandler",
1038            "RemapFlowPeerHandler",
1039        ];
1040        assert_eq!(names.len(), handlers.len());
1041        for (handler, name) in handlers.iter().zip(names.into_iter()) {
1042            assert_eq!(handler.name, name);
1043        }
1044    }
1045
1046    #[test]
1047    fn test_handler_group_builder_add_after_last() {
1048        let mut builder =
1049            HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers();
1050        builder
1051            .add_handler_after("CollectStatsHandler", ResponseHeaderHandler)
1052            .unwrap();
1053
1054        let group = builder.build().unwrap();
1055        let handlers = group.handlers;
1056        let names = [
1057            "ResponseHeaderHandler",
1058            "DatanodeKeepLeaseHandler",
1059            "FlownodeKeepLeaseHandler",
1060            "CheckLeaderHandler",
1061            "OnLeaderStartHandler",
1062            "ExtractStatHandler",
1063            "CollectDatanodeClusterInfoHandler",
1064            "CollectFrontendClusterInfoHandler",
1065            "CollectFlownodeClusterInfoHandler",
1066            "MailboxHandler",
1067            "FilterInactiveRegionStatsHandler",
1068            "CollectLeaderRegionHandler",
1069            "CollectTopicStatsHandler",
1070            "CollectStatsHandler",
1071            "ResponseHeaderHandler",
1072            "RemapFlowPeerHandler",
1073        ];
1074        assert_eq!(names.len(), handlers.len());
1075        for (handler, name) in handlers.iter().zip(names.into_iter()) {
1076            assert_eq!(handler.name, name);
1077        }
1078    }
1079
1080    #[test]
1081    fn test_handler_group_builder_replace() {
1082        let mut builder =
1083            HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers();
1084        builder
1085            .replace_handler("MailboxHandler", CollectStatsHandler::default())
1086            .unwrap();
1087
1088        let group = builder.build().unwrap();
1089        let handlers = group.handlers;
1090        let names = [
1091            "ResponseHeaderHandler",
1092            "DatanodeKeepLeaseHandler",
1093            "FlownodeKeepLeaseHandler",
1094            "CheckLeaderHandler",
1095            "OnLeaderStartHandler",
1096            "ExtractStatHandler",
1097            "CollectDatanodeClusterInfoHandler",
1098            "CollectFrontendClusterInfoHandler",
1099            "CollectFlownodeClusterInfoHandler",
1100            "CollectStatsHandler",
1101            "FilterInactiveRegionStatsHandler",
1102            "CollectLeaderRegionHandler",
1103            "CollectTopicStatsHandler",
1104            "CollectStatsHandler",
1105            "RemapFlowPeerHandler",
1106        ];
1107
1108        assert_eq!(names.len(), handlers.len());
1109        for (handler, name) in handlers.iter().zip(names.into_iter()) {
1110            assert_eq!(handler.name, name);
1111        }
1112    }
1113
1114    #[test]
1115    fn test_handler_group_builder_replace_last() {
1116        let mut builder =
1117            HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers();
1118        builder
1119            .replace_handler("CollectStatsHandler", ResponseHeaderHandler)
1120            .unwrap();
1121
1122        let group = builder.build().unwrap();
1123        let handlers = group.handlers;
1124        let names = [
1125            "ResponseHeaderHandler",
1126            "DatanodeKeepLeaseHandler",
1127            "FlownodeKeepLeaseHandler",
1128            "CheckLeaderHandler",
1129            "OnLeaderStartHandler",
1130            "ExtractStatHandler",
1131            "CollectDatanodeClusterInfoHandler",
1132            "CollectFrontendClusterInfoHandler",
1133            "CollectFlownodeClusterInfoHandler",
1134            "MailboxHandler",
1135            "FilterInactiveRegionStatsHandler",
1136            "CollectLeaderRegionHandler",
1137            "CollectTopicStatsHandler",
1138            "ResponseHeaderHandler",
1139            "RemapFlowPeerHandler",
1140        ];
1141
1142        assert_eq!(names.len(), handlers.len());
1143        for (handler, name) in handlers.iter().zip(names.into_iter()) {
1144            assert_eq!(handler.name, name);
1145        }
1146    }
1147
1148    #[test]
1149    fn test_handler_group_builder_replace_first() {
1150        let mut builder =
1151            HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers();
1152        builder
1153            .replace_handler("ResponseHeaderHandler", CollectStatsHandler::default())
1154            .unwrap();
1155
1156        let group = builder.build().unwrap();
1157        let handlers = group.handlers;
1158        let names = [
1159            "CollectStatsHandler",
1160            "DatanodeKeepLeaseHandler",
1161            "FlownodeKeepLeaseHandler",
1162            "CheckLeaderHandler",
1163            "OnLeaderStartHandler",
1164            "ExtractStatHandler",
1165            "CollectDatanodeClusterInfoHandler",
1166            "CollectFrontendClusterInfoHandler",
1167            "CollectFlownodeClusterInfoHandler",
1168            "MailboxHandler",
1169            "FilterInactiveRegionStatsHandler",
1170            "CollectLeaderRegionHandler",
1171            "CollectTopicStatsHandler",
1172            "CollectStatsHandler",
1173            "RemapFlowPeerHandler",
1174        ];
1175        assert_eq!(names.len(), handlers.len());
1176        for (handler, name) in handlers.iter().zip(names.into_iter()) {
1177            assert_eq!(handler.name, name);
1178        }
1179    }
1180
1181    #[test]
1182    fn test_handler_group_builder_handler_not_found() {
1183        let mut builder =
1184            HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers();
1185        let err = builder
1186            .add_handler_before("NotExists", CollectStatsHandler::default())
1187            .unwrap_err();
1188        assert_matches!(err, error::Error::HandlerNotFound { .. });
1189
1190        let err = builder
1191            .add_handler_after("NotExists", CollectStatsHandler::default())
1192            .unwrap_err();
1193        assert_matches!(err, error::Error::HandlerNotFound { .. });
1194
1195        let err = builder
1196            .replace_handler("NotExists", CollectStatsHandler::default())
1197            .unwrap_err();
1198        assert_matches!(err, error::Error::HandlerNotFound { .. });
1199    }
1200
1201    #[tokio::test]
1202    async fn test_pusher_drop() {
1203        let (tx, _rx) = mpsc::channel(1);
1204        let pusher = Pusher::new(tx);
1205        let mut deregister_signal_tx = pusher.deregister_signal_receiver.clone();
1206
1207        drop(pusher);
1208        deregister_signal_tx.changed().await.unwrap();
1209    }
1210}