meta_srv/
handler.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashSet};
16use std::fmt::{Debug, Display};
17use std::ops::Range;
18use std::sync::{Arc, Mutex};
19use std::time::{Duration, Instant};
20
21use api::v1::meta::mailbox_message::Payload;
22use api::v1::meta::{
23    HeartbeatRequest, HeartbeatResponse, MailboxMessage, RegionLease, ResponseHeader, Role,
24    PROTOCOL_VERSION,
25};
26use check_leader_handler::CheckLeaderHandler;
27use collect_cluster_info_handler::{
28    CollectDatanodeClusterInfoHandler, CollectFlownodeClusterInfoHandler,
29    CollectFrontendClusterInfoHandler,
30};
31use collect_leader_region_handler::CollectLeaderRegionHandler;
32use collect_stats_handler::CollectStatsHandler;
33use common_base::Plugins;
34use common_meta::datanode::Stat;
35use common_meta::instruction::{Instruction, InstructionReply};
36use common_meta::sequence::Sequence;
37use common_telemetry::{debug, info, warn};
38use dashmap::DashMap;
39use extract_stat_handler::ExtractStatHandler;
40use failure_handler::RegionFailureHandler;
41use filter_inactive_region_stats::FilterInactiveRegionStatsHandler;
42use futures::future::join_all;
43use keep_lease_handler::{DatanodeKeepLeaseHandler, FlownodeKeepLeaseHandler};
44use mailbox_handler::MailboxHandler;
45use on_leader_start_handler::OnLeaderStartHandler;
46use publish_heartbeat_handler::PublishHeartbeatHandler;
47use region_lease_handler::RegionLeaseHandler;
48use remap_flow_peer_handler::RemapFlowPeerHandler;
49use response_header_handler::ResponseHeaderHandler;
50use snafu::{OptionExt, ResultExt};
51use store_api::storage::RegionId;
52use tokio::sync::mpsc::Sender;
53use tokio::sync::{oneshot, watch, Notify, RwLock};
54
55use crate::error::{self, DeserializeFromJsonSnafu, Result, UnexpectedInstructionReplySnafu};
56use crate::handler::collect_topic_stats_handler::CollectTopicStatsHandler;
57use crate::handler::flow_state_handler::FlowStateHandler;
58use crate::handler::persist_stats_handler::PersistStatsHandler;
59use crate::metasrv::Context;
60use crate::metrics::{METRIC_META_HANDLER_EXECUTE, METRIC_META_HEARTBEAT_CONNECTION_NUM};
61use crate::pubsub::PublisherRef;
62use crate::service::mailbox::{
63    BroadcastChannel, Channel, Mailbox, MailboxReceiver, MailboxRef, MessageId,
64};
65
66pub mod check_leader_handler;
67pub mod collect_cluster_info_handler;
68pub mod collect_leader_region_handler;
69pub mod collect_stats_handler;
70pub mod collect_topic_stats_handler;
71pub mod extract_stat_handler;
72pub mod failure_handler;
73pub mod filter_inactive_region_stats;
74pub mod flow_state_handler;
75pub mod keep_lease_handler;
76pub mod mailbox_handler;
77pub mod on_leader_start_handler;
78pub mod persist_stats_handler;
79pub mod publish_heartbeat_handler;
80pub mod region_lease_handler;
81pub mod remap_flow_peer_handler;
82pub mod response_header_handler;
83
84#[cfg(test)]
85pub mod test_utils;
86
87#[async_trait::async_trait]
88pub trait HeartbeatHandler: Send + Sync {
89    fn is_acceptable(&self, role: Role) -> bool;
90
91    fn name(&self) -> &'static str {
92        let type_name = std::any::type_name::<Self>();
93        // short name
94        type_name.split("::").last().unwrap_or(type_name)
95    }
96
97    async fn handle(
98        &self,
99        req: &HeartbeatRequest,
100        ctx: &mut Context,
101        acc: &mut HeartbeatAccumulator,
102    ) -> Result<HandleControl>;
103}
104
105/// HandleControl
106///
107/// Controls process of handling heartbeat request.
108#[derive(PartialEq, Debug)]
109pub enum HandleControl {
110    Continue,
111    Done,
112}
113
114#[derive(Debug, Default)]
115pub struct HeartbeatAccumulator {
116    pub header: Option<ResponseHeader>,
117    pub instructions: Vec<Instruction>,
118    pub stat: Option<Stat>,
119    pub inactive_region_ids: HashSet<RegionId>,
120    pub region_lease: Option<RegionLease>,
121}
122
123impl HeartbeatAccumulator {
124    pub fn into_mailbox_message(self) -> Option<MailboxMessage> {
125        // TODO(jiachun): to HeartbeatResponse payload
126        None
127    }
128}
129
130#[derive(Copy, Clone)]
131pub struct PusherId {
132    pub role: Role,
133    pub id: u64,
134}
135
136impl Debug for PusherId {
137    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
138        write!(f, "{:?}-{}", self.role, self.id)
139    }
140}
141
142impl Display for PusherId {
143    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
144        write!(f, "{:?}-{}", self.role, self.id)
145    }
146}
147
148impl PusherId {
149    pub fn new(role: Role, id: u64) -> Self {
150        Self { role, id }
151    }
152
153    pub fn string_key(&self) -> String {
154        format!("{}-{}", self.role as i32, self.id)
155    }
156}
157
158/// The receiver of the deregister signal.
159pub type DeregisterSignalReceiver = watch::Receiver<bool>;
160
161/// The pusher of the heartbeat response.
162pub struct Pusher {
163    sender: Sender<std::result::Result<HeartbeatResponse, tonic::Status>>,
164    // The sender of the deregister signal.
165    // default is false, means the pusher is not deregistered.
166    // when the pusher is deregistered, the sender will be notified.
167    deregister_signal_sender: watch::Sender<bool>,
168    deregister_signal_receiver: DeregisterSignalReceiver,
169
170    res_header: ResponseHeader,
171}
172
173impl Drop for Pusher {
174    fn drop(&mut self) {
175        // Ignore the error here.
176        // if all the receivers have been dropped, means no body cares the deregister signal.
177        let _ = self.deregister_signal_sender.send(true);
178    }
179}
180
181impl Pusher {
182    pub fn new(sender: Sender<std::result::Result<HeartbeatResponse, tonic::Status>>) -> Self {
183        let res_header = ResponseHeader {
184            protocol_version: PROTOCOL_VERSION,
185            ..Default::default()
186        };
187        let (deregister_signal_sender, deregister_signal_receiver) = watch::channel(false);
188        Self {
189            sender,
190            deregister_signal_sender,
191            deregister_signal_receiver,
192            res_header,
193        }
194    }
195
196    #[inline]
197    pub async fn push(&self, res: HeartbeatResponse) -> Result<()> {
198        self.sender.send(Ok(res)).await.map_err(|e| {
199            error::PushMessageSnafu {
200                err_msg: e.to_string(),
201            }
202            .build()
203        })
204    }
205
206    #[inline]
207    pub fn header(&self) -> ResponseHeader {
208        self.res_header.clone()
209    }
210}
211
212/// The group of heartbeat pushers.
213#[derive(Clone, Default)]
214pub struct Pushers(Arc<RwLock<BTreeMap<String, Pusher>>>);
215
216impl Pushers {
217    async fn push(
218        &self,
219        pusher_id: PusherId,
220        mailbox_message: MailboxMessage,
221    ) -> Result<DeregisterSignalReceiver> {
222        let pusher_id = pusher_id.string_key();
223        let pushers = self.0.read().await;
224        let pusher = pushers
225            .get(&pusher_id)
226            .context(error::PusherNotFoundSnafu { pusher_id })?;
227
228        pusher
229            .push(HeartbeatResponse {
230                header: Some(pusher.header()),
231                mailbox_message: Some(mailbox_message),
232                ..Default::default()
233            })
234            .await?;
235
236        Ok(pusher.deregister_signal_receiver.clone())
237    }
238
239    async fn broadcast(
240        &self,
241        range: Range<String>,
242        mailbox_message: &MailboxMessage,
243    ) -> Result<()> {
244        let pushers = self.0.read().await;
245        let pushers = pushers
246            .range(range)
247            .map(|(_, value)| value)
248            .collect::<Vec<_>>();
249        let mut results = Vec::with_capacity(pushers.len());
250
251        for pusher in pushers {
252            let mut mailbox_message = mailbox_message.clone();
253            mailbox_message.id = 0; // one-way message
254
255            results.push(pusher.push(HeartbeatResponse {
256                header: Some(pusher.header()),
257                mailbox_message: Some(mailbox_message),
258                ..Default::default()
259            }))
260        }
261
262        // Checks the error out of the loop.
263        let _ = join_all(results)
264            .await
265            .into_iter()
266            .collect::<Result<Vec<_>>>()?;
267
268        Ok(())
269    }
270
271    pub(crate) async fn insert(&self, pusher_id: String, pusher: Pusher) -> Option<Pusher> {
272        self.0.write().await.insert(pusher_id, pusher)
273    }
274
275    async fn remove(&self, pusher_id: &str) -> Option<Pusher> {
276        self.0.write().await.remove(pusher_id)
277    }
278}
279
280#[derive(Clone)]
281pub struct NameCachedHandler {
282    name: &'static str,
283    handler: Arc<dyn HeartbeatHandler>,
284}
285
286impl NameCachedHandler {
287    fn new(handler: impl HeartbeatHandler + 'static) -> Self {
288        let name = handler.name();
289        let handler = Arc::new(handler);
290        Self { name, handler }
291    }
292}
293
294pub type HeartbeatHandlerGroupRef = Arc<HeartbeatHandlerGroup>;
295
296/// The group of heartbeat handlers.
297#[derive(Default, Clone)]
298pub struct HeartbeatHandlerGroup {
299    handlers: Vec<NameCachedHandler>,
300    pushers: Pushers,
301}
302
303impl HeartbeatHandlerGroup {
304    /// Registers the heartbeat response [`Pusher`] with the given key to the group.
305    pub async fn register_pusher(&self, pusher_id: PusherId, pusher: Pusher) {
306        METRIC_META_HEARTBEAT_CONNECTION_NUM.inc();
307        info!("Pusher register: {}", pusher_id);
308        let _ = self.pushers.insert(pusher_id.string_key(), pusher).await;
309    }
310
311    /// Deregisters the heartbeat response [`Pusher`] with the given key from the group.
312    ///
313    /// Returns the [`Pusher`] if it exists.
314    pub async fn deregister_push(&self, pusher_id: PusherId) -> Option<Pusher> {
315        METRIC_META_HEARTBEAT_CONNECTION_NUM.dec();
316        info!("Pusher unregister: {}", pusher_id);
317        self.pushers.remove(&pusher_id.string_key()).await
318    }
319
320    /// Returns the [`Pushers`] of the group.
321    pub fn pushers(&self) -> Pushers {
322        self.pushers.clone()
323    }
324
325    /// Handles the heartbeat request.
326    pub async fn handle(
327        &self,
328        req: HeartbeatRequest,
329        mut ctx: Context,
330    ) -> Result<HeartbeatResponse> {
331        let mut acc = HeartbeatAccumulator::default();
332        let role = req
333            .header
334            .as_ref()
335            .and_then(|h| Role::try_from(h.role).ok())
336            .context(error::InvalidArgumentsSnafu {
337                err_msg: format!("invalid role: {:?}", req.header),
338            })?;
339
340        for NameCachedHandler { name, handler } in self.handlers.iter() {
341            if !handler.is_acceptable(role) {
342                continue;
343            }
344
345            let _timer = METRIC_META_HANDLER_EXECUTE
346                .with_label_values(&[*name])
347                .start_timer();
348
349            if handler.handle(&req, &mut ctx, &mut acc).await? == HandleControl::Done {
350                break;
351            }
352        }
353        let header = std::mem::take(&mut acc.header);
354        let res = HeartbeatResponse {
355            header,
356            region_lease: acc.region_lease,
357            ..Default::default()
358        };
359        Ok(res)
360    }
361}
362
363pub struct HeartbeatMailbox {
364    pushers: Pushers,
365    sequence: Sequence,
366    senders: DashMap<MessageId, oneshot::Sender<Result<MailboxMessage>>>,
367    timeouts: DashMap<MessageId, Instant>,
368    timeout_notify: Notify,
369}
370
371impl HeartbeatMailbox {
372    pub fn json_reply(msg: &MailboxMessage) -> Result<InstructionReply> {
373        let Payload::Json(payload) =
374            msg.payload
375                .as_ref()
376                .with_context(|| UnexpectedInstructionReplySnafu {
377                    mailbox_message: msg.to_string(),
378                    reason: format!("empty payload, msg: {msg:?}"),
379                })?;
380        serde_json::from_str(payload).context(DeserializeFromJsonSnafu { input: payload })
381    }
382
383    /// Parses the [Instruction] from [MailboxMessage].
384    #[cfg(test)]
385    pub fn json_instruction(msg: &MailboxMessage) -> Result<Instruction> {
386        let Payload::Json(payload) =
387            msg.payload
388                .as_ref()
389                .with_context(|| UnexpectedInstructionReplySnafu {
390                    mailbox_message: msg.to_string(),
391                    reason: format!("empty payload, msg: {msg:?}"),
392                })?;
393        serde_json::from_str(payload).context(DeserializeFromJsonSnafu { input: payload })
394    }
395
396    pub fn create(pushers: Pushers, sequence: Sequence) -> MailboxRef {
397        let mailbox = Arc::new(Self::new(pushers, sequence));
398
399        let timeout_checker = mailbox.clone();
400        let _handle = common_runtime::spawn_global(async move {
401            timeout_checker.check_timeout_bg(10).await;
402        });
403
404        mailbox
405    }
406
407    fn new(pushers: Pushers, sequence: Sequence) -> Self {
408        Self {
409            pushers,
410            sequence,
411            senders: DashMap::default(),
412            timeouts: DashMap::default(),
413            timeout_notify: Notify::new(),
414        }
415    }
416
417    async fn check_timeout_bg(&self, interval_millis: u64) {
418        let mut interval = tokio::time::interval(Duration::from_millis(interval_millis));
419
420        loop {
421            let _ = interval.tick().await;
422
423            if self.timeouts.is_empty() {
424                self.timeout_notify.notified().await;
425            }
426
427            let now = Instant::now();
428            let timeout_ids = self
429                .timeouts
430                .iter()
431                .filter_map(|entry| {
432                    let (id, deadline) = entry.pair();
433                    if deadline < &now {
434                        Some(*id)
435                    } else {
436                        None
437                    }
438                })
439                .collect::<Vec<_>>();
440
441            for id in timeout_ids {
442                let _ = self
443                    .on_recv(id, Err(error::MailboxTimeoutSnafu { id }.build()))
444                    .await;
445            }
446        }
447    }
448
449    #[inline]
450    async fn next_message_id(&self) -> Result<u64> {
451        // In this implementation, we pre-occupy the message_id of 0,
452        // and we use `message_id = 0` to mark a Message as a one-way call.
453        loop {
454            let next = self
455                .sequence
456                .next()
457                .await
458                .context(error::NextSequenceSnafu)?;
459            if next > 0 {
460                return Ok(next);
461            }
462        }
463    }
464}
465
466#[async_trait::async_trait]
467impl Mailbox for HeartbeatMailbox {
468    async fn send(
469        &self,
470        ch: &Channel,
471        mut msg: MailboxMessage,
472        timeout: Duration,
473    ) -> Result<MailboxReceiver> {
474        let message_id = self.next_message_id().await?;
475        msg.id = message_id;
476
477        let pusher_id = ch.pusher_id();
478        debug!("Sending mailbox message {msg:?} to {pusher_id}");
479
480        let (tx, rx) = oneshot::channel();
481        let _ = self.senders.insert(message_id, tx);
482        let deadline = Instant::now() + timeout;
483        self.timeouts.insert(message_id, deadline);
484        self.timeout_notify.notify_one();
485
486        let deregister_signal_receiver = self.pushers.push(pusher_id, msg).await?;
487
488        Ok(MailboxReceiver::new(
489            message_id,
490            rx,
491            deregister_signal_receiver,
492            *ch,
493        ))
494    }
495
496    async fn send_oneway(&self, ch: &Channel, mut msg: MailboxMessage) -> Result<()> {
497        let message_id = 0; // one-way message, same as `broadcast`
498        msg.id = message_id;
499
500        let pusher_id = ch.pusher_id();
501        debug!("Sending mailbox message {msg:?} to {pusher_id}");
502
503        self.pushers.push(pusher_id, msg).await?;
504
505        Ok(())
506    }
507
508    async fn broadcast(&self, ch: &BroadcastChannel, msg: &MailboxMessage) -> Result<()> {
509        self.pushers.broadcast(ch.pusher_range(), msg).await
510    }
511
512    async fn on_recv(&self, id: MessageId, maybe_msg: Result<MailboxMessage>) -> Result<()> {
513        debug!("Received mailbox message {maybe_msg:?}");
514
515        let _ = self.timeouts.remove(&id);
516
517        if let Some((_, tx)) = self.senders.remove(&id) {
518            tx.send(maybe_msg)
519                .map_err(|_| error::MailboxClosedSnafu { id }.build())?;
520        } else if let Ok(finally_msg) = maybe_msg {
521            warn!("The response arrived too late: {finally_msg:?}");
522        }
523
524        Ok(())
525    }
526}
527
528/// The builder to build the group of heartbeat handlers.
529pub struct HeartbeatHandlerGroupBuilder {
530    /// The handler to handle region failure.
531    region_failure_handler: Option<RegionFailureHandler>,
532
533    /// The handler to handle region lease.
534    region_lease_handler: Option<RegionLeaseHandler>,
535
536    /// The factor that determines how often statistics should be flushed,
537    /// based on the number of received heartbeats. When the number of heartbeats
538    /// reaches this factor, a flush operation is triggered.
539    flush_stats_factor: Option<usize>,
540    /// A simple handler for flow internal state report
541    flow_state_handler: Option<FlowStateHandler>,
542
543    /// The handler to persist stats.
544    persist_stats_handler: Option<PersistStatsHandler>,
545
546    /// The plugins.
547    plugins: Option<Plugins>,
548
549    /// The heartbeat response pushers.
550    pushers: Pushers,
551
552    /// The group of heartbeat handlers.
553    handlers: Vec<NameCachedHandler>,
554}
555
556impl HeartbeatHandlerGroupBuilder {
557    pub fn new(pushers: Pushers) -> Self {
558        Self {
559            region_failure_handler: None,
560            region_lease_handler: None,
561            flush_stats_factor: None,
562            flow_state_handler: None,
563            persist_stats_handler: None,
564            plugins: None,
565            pushers,
566            handlers: vec![],
567        }
568    }
569
570    pub fn with_flow_state_handler(mut self, handler: Option<FlowStateHandler>) -> Self {
571        self.flow_state_handler = handler;
572        self
573    }
574
575    pub fn with_region_lease_handler(mut self, handler: Option<RegionLeaseHandler>) -> Self {
576        self.region_lease_handler = handler;
577        self
578    }
579
580    /// Sets the [`RegionFailureHandler`].
581    pub fn with_region_failure_handler(mut self, handler: Option<RegionFailureHandler>) -> Self {
582        self.region_failure_handler = handler;
583        self
584    }
585
586    /// Sets the flush stats factor.
587    pub fn with_flush_stats_factor(mut self, flush_stats_factor: Option<usize>) -> Self {
588        self.flush_stats_factor = flush_stats_factor;
589        self
590    }
591
592    pub fn with_persist_stats_handler(mut self, handler: Option<PersistStatsHandler>) -> Self {
593        self.persist_stats_handler = handler;
594        self
595    }
596
597    /// Sets the [`Plugins`].
598    pub fn with_plugins(mut self, plugins: Option<Plugins>) -> Self {
599        self.plugins = plugins;
600        self
601    }
602
603    /// Adds the default handlers.
604    pub fn add_default_handlers(mut self) -> Self {
605        // Extract the `PublishHeartbeatHandler` from the plugins.
606        let publish_heartbeat_handler = if let Some(plugins) = self.plugins.as_ref() {
607            plugins
608                .get::<PublisherRef>()
609                .map(|publish| PublishHeartbeatHandler::new(publish.clone()))
610        } else {
611            None
612        };
613
614        self.add_handler_last(ResponseHeaderHandler);
615        // `KeepLeaseHandler` should preferably be in front of `CheckLeaderHandler`,
616        // because even if the current meta-server node is no longer the leader it can
617        // still help the datanode to keep lease.
618        self.add_handler_last(DatanodeKeepLeaseHandler);
619        self.add_handler_last(FlownodeKeepLeaseHandler);
620        self.add_handler_last(CheckLeaderHandler);
621        self.add_handler_last(OnLeaderStartHandler);
622        self.add_handler_last(ExtractStatHandler);
623        self.add_handler_last(CollectDatanodeClusterInfoHandler);
624        self.add_handler_last(CollectFrontendClusterInfoHandler);
625        self.add_handler_last(CollectFlownodeClusterInfoHandler);
626        self.add_handler_last(MailboxHandler);
627        if let Some(region_lease_handler) = self.region_lease_handler.take() {
628            self.add_handler_last(region_lease_handler);
629        }
630        self.add_handler_last(FilterInactiveRegionStatsHandler);
631        if let Some(region_failure_handler) = self.region_failure_handler.take() {
632            self.add_handler_last(region_failure_handler);
633        }
634        if let Some(publish_heartbeat_handler) = publish_heartbeat_handler {
635            self.add_handler_last(publish_heartbeat_handler);
636        }
637        self.add_handler_last(CollectLeaderRegionHandler);
638        self.add_handler_last(CollectTopicStatsHandler);
639        // Persist stats handler should be in front of collect stats handler.
640        // Because collect stats handler will consume the stats from the accumulator.
641        if let Some(persist_stats_handler) = self.persist_stats_handler.take() {
642            self.add_handler_last(persist_stats_handler);
643        }
644        self.add_handler_last(CollectStatsHandler::new(self.flush_stats_factor));
645        self.add_handler_last(RemapFlowPeerHandler::default());
646
647        if let Some(flow_state_handler) = self.flow_state_handler.take() {
648            self.add_handler_last(flow_state_handler);
649        }
650
651        self
652    }
653
654    /// Builds the group of heartbeat handlers.
655    ///
656    /// Applies the customizer if it exists.
657    pub fn build(mut self) -> Result<HeartbeatHandlerGroup> {
658        if let Some(customizer) = self
659            .plugins
660            .as_ref()
661            .and_then(|plugins| plugins.get::<HeartbeatHandlerGroupBuilderCustomizerRef>())
662        {
663            debug!("Customizing the heartbeat handler group builder");
664            customizer.customize(&mut self)?;
665        }
666
667        Ok(HeartbeatHandlerGroup {
668            handlers: self.handlers,
669            pushers: self.pushers,
670        })
671    }
672
673    fn add_handler_after_inner(&mut self, target: &str, handler: NameCachedHandler) -> Result<()> {
674        if let Some(pos) = self.handlers.iter().position(|x| x.name == target) {
675            self.handlers.insert(pos + 1, handler);
676            return Ok(());
677        }
678
679        error::HandlerNotFoundSnafu { name: target }.fail()
680    }
681
682    /// Adds the handler after the specified handler.
683    pub fn add_handler_after(
684        &mut self,
685        target: &'static str,
686        handler: impl HeartbeatHandler + 'static,
687    ) -> Result<()> {
688        self.add_handler_after_inner(target, NameCachedHandler::new(handler))
689    }
690
691    fn add_handler_before_inner(&mut self, target: &str, handler: NameCachedHandler) -> Result<()> {
692        if let Some(pos) = self.handlers.iter().position(|x| x.name == target) {
693            self.handlers.insert(pos, handler);
694            return Ok(());
695        }
696
697        error::HandlerNotFoundSnafu { name: target }.fail()
698    }
699
700    /// Adds the handler before the specified handler.
701    pub fn add_handler_before(
702        &mut self,
703        target: &'static str,
704        handler: impl HeartbeatHandler + 'static,
705    ) -> Result<()> {
706        self.add_handler_before_inner(target, NameCachedHandler::new(handler))
707    }
708
709    fn replace_handler_inner(&mut self, target: &str, handler: NameCachedHandler) -> Result<()> {
710        if let Some(pos) = self.handlers.iter().position(|x| x.name == target) {
711            self.handlers[pos] = handler;
712            return Ok(());
713        }
714
715        error::HandlerNotFoundSnafu { name: target }.fail()
716    }
717
718    /// Replaces the handler with the specified name.
719    pub fn replace_handler(
720        &mut self,
721        target: &'static str,
722        handler: impl HeartbeatHandler + 'static,
723    ) -> Result<()> {
724        self.replace_handler_inner(target, NameCachedHandler::new(handler))
725    }
726
727    fn add_handler_last_inner(&mut self, handler: NameCachedHandler) {
728        self.handlers.push(handler);
729    }
730
731    fn add_handler_last(&mut self, handler: impl HeartbeatHandler + 'static) {
732        self.add_handler_last_inner(NameCachedHandler::new(handler));
733    }
734}
735
736pub type HeartbeatHandlerGroupBuilderCustomizerRef =
737    Arc<dyn HeartbeatHandlerGroupBuilderCustomizer>;
738
739pub enum CustomizeHeartbeatGroupAction {
740    AddHandlerAfter {
741        target: String,
742        handler: NameCachedHandler,
743    },
744    AddHandlerBefore {
745        target: String,
746        handler: NameCachedHandler,
747    },
748    ReplaceHandler {
749        target: String,
750        handler: NameCachedHandler,
751    },
752    AddHandlerLast {
753        handler: NameCachedHandler,
754    },
755}
756
757impl CustomizeHeartbeatGroupAction {
758    pub fn new_add_handler_after(
759        target: &'static str,
760        handler: impl HeartbeatHandler + 'static,
761    ) -> Self {
762        Self::AddHandlerAfter {
763            target: target.to_string(),
764            handler: NameCachedHandler::new(handler),
765        }
766    }
767
768    pub fn new_add_handler_before(
769        target: &'static str,
770        handler: impl HeartbeatHandler + 'static,
771    ) -> Self {
772        Self::AddHandlerBefore {
773            target: target.to_string(),
774            handler: NameCachedHandler::new(handler),
775        }
776    }
777
778    pub fn new_replace_handler(
779        target: &'static str,
780        handler: impl HeartbeatHandler + 'static,
781    ) -> Self {
782        Self::ReplaceHandler {
783            target: target.to_string(),
784            handler: NameCachedHandler::new(handler),
785        }
786    }
787
788    pub fn new_add_handler_last(handler: impl HeartbeatHandler + 'static) -> Self {
789        Self::AddHandlerLast {
790            handler: NameCachedHandler::new(handler),
791        }
792    }
793}
794
795/// The customizer of the [`HeartbeatHandlerGroupBuilder`].
796pub trait HeartbeatHandlerGroupBuilderCustomizer: Send + Sync {
797    fn customize(&self, builder: &mut HeartbeatHandlerGroupBuilder) -> Result<()>;
798
799    fn add_action(&self, action: CustomizeHeartbeatGroupAction);
800}
801
802#[derive(Default)]
803pub struct DefaultHeartbeatHandlerGroupBuilderCustomizer {
804    actions: Mutex<Vec<CustomizeHeartbeatGroupAction>>,
805}
806
807impl HeartbeatHandlerGroupBuilderCustomizer for DefaultHeartbeatHandlerGroupBuilderCustomizer {
808    fn customize(&self, builder: &mut HeartbeatHandlerGroupBuilder) -> Result<()> {
809        info!("Customizing the heartbeat handler group builder");
810        let mut actions = self.actions.lock().unwrap();
811        for action in actions.drain(..) {
812            match action {
813                CustomizeHeartbeatGroupAction::AddHandlerAfter { target, handler } => {
814                    builder.add_handler_after_inner(&target, handler)?;
815                }
816                CustomizeHeartbeatGroupAction::AddHandlerBefore { target, handler } => {
817                    builder.add_handler_before_inner(&target, handler)?;
818                }
819                CustomizeHeartbeatGroupAction::ReplaceHandler { target, handler } => {
820                    builder.replace_handler_inner(&target, handler)?;
821                }
822                CustomizeHeartbeatGroupAction::AddHandlerLast { handler } => {
823                    builder.add_handler_last_inner(handler);
824                }
825            }
826        }
827        Ok(())
828    }
829
830    fn add_action(&self, action: CustomizeHeartbeatGroupAction) {
831        self.actions.lock().unwrap().push(action);
832    }
833}
834
835#[cfg(test)]
836mod tests {
837
838    use std::assert_matches::assert_matches;
839    use std::sync::Arc;
840    use std::time::Duration;
841
842    use api::v1::meta::{MailboxMessage, Role};
843    use common_meta::kv_backend::memory::MemoryKvBackend;
844    use common_meta::sequence::SequenceBuilder;
845    use tokio::sync::mpsc;
846
847    use super::{HeartbeatHandlerGroupBuilder, PusherId, Pushers};
848    use crate::error;
849    use crate::handler::collect_stats_handler::CollectStatsHandler;
850    use crate::handler::response_header_handler::ResponseHeaderHandler;
851    use crate::handler::{HeartbeatHandlerGroup, HeartbeatMailbox, Pusher};
852    use crate::service::mailbox::{Channel, MailboxReceiver, MailboxRef};
853
854    #[tokio::test]
855    async fn test_mailbox() {
856        let (mailbox, receiver) = push_msg_via_mailbox().await;
857        let id = receiver.message_id();
858
859        let resp_msg = MailboxMessage {
860            id,
861            subject: "resp-test".to_string(),
862            timestamp_millis: 456,
863            ..Default::default()
864        };
865
866        mailbox.on_recv(id, Ok(resp_msg)).await.unwrap();
867
868        let recv_msg = receiver.await.unwrap();
869        assert_eq!(recv_msg.id, id);
870        assert_eq!(recv_msg.timestamp_millis, 456);
871        assert_eq!(recv_msg.subject, "resp-test".to_string());
872    }
873
874    #[tokio::test]
875    async fn test_mailbox_timeout() {
876        let (_, receiver) = push_msg_via_mailbox().await;
877        let res = receiver.await;
878        assert!(res.is_err());
879    }
880
881    async fn push_msg_via_mailbox() -> (MailboxRef, MailboxReceiver) {
882        let datanode_id = 12;
883        let (pusher_tx, mut pusher_rx) = mpsc::channel(16);
884        let pusher_id = PusherId::new(Role::Datanode, datanode_id);
885        let pusher: Pusher = Pusher::new(pusher_tx);
886        let handler_group = HeartbeatHandlerGroup::default();
887        handler_group.register_pusher(pusher_id, pusher).await;
888
889        let kv_backend = Arc::new(MemoryKvBackend::new());
890        let seq = SequenceBuilder::new("test_seq", kv_backend).build();
891        let mailbox = HeartbeatMailbox::create(handler_group.pushers(), seq);
892
893        let msg = MailboxMessage {
894            id: 0,
895            subject: "req-test".to_string(),
896            timestamp_millis: 123,
897            ..Default::default()
898        };
899        let ch = Channel::Datanode(datanode_id);
900
901        let receiver = mailbox
902            .send(&ch, msg, Duration::from_secs(1))
903            .await
904            .unwrap();
905
906        let recv_obj = pusher_rx.recv().await.unwrap().unwrap();
907        let message = recv_obj.mailbox_message.unwrap();
908        assert_eq!(message.timestamp_millis, 123);
909        assert_eq!(message.subject, "req-test".to_string());
910
911        (mailbox, receiver)
912    }
913
914    #[test]
915    fn test_handler_group_builder() {
916        let group = HeartbeatHandlerGroupBuilder::new(Pushers::default())
917            .add_default_handlers()
918            .build()
919            .unwrap();
920
921        let handlers = group.handlers;
922        let names = [
923            "ResponseHeaderHandler",
924            "DatanodeKeepLeaseHandler",
925            "FlownodeKeepLeaseHandler",
926            "CheckLeaderHandler",
927            "OnLeaderStartHandler",
928            "ExtractStatHandler",
929            "CollectDatanodeClusterInfoHandler",
930            "CollectFrontendClusterInfoHandler",
931            "CollectFlownodeClusterInfoHandler",
932            "MailboxHandler",
933            "FilterInactiveRegionStatsHandler",
934            "CollectLeaderRegionHandler",
935            "CollectTopicStatsHandler",
936            "CollectStatsHandler",
937            "RemapFlowPeerHandler",
938        ];
939        assert_eq!(names.len(), handlers.len());
940        for (handler, name) in handlers.iter().zip(names.into_iter()) {
941            assert_eq!(handler.name, name);
942        }
943    }
944
945    #[test]
946    fn test_handler_group_builder_add_before() {
947        let mut builder =
948            HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers();
949        builder
950            .add_handler_before(
951                "FilterInactiveRegionStatsHandler",
952                CollectStatsHandler::default(),
953            )
954            .unwrap();
955
956        let group = builder.build().unwrap();
957        let handlers = group.handlers;
958        let names = [
959            "ResponseHeaderHandler",
960            "DatanodeKeepLeaseHandler",
961            "FlownodeKeepLeaseHandler",
962            "CheckLeaderHandler",
963            "OnLeaderStartHandler",
964            "ExtractStatHandler",
965            "CollectDatanodeClusterInfoHandler",
966            "CollectFrontendClusterInfoHandler",
967            "CollectFlownodeClusterInfoHandler",
968            "MailboxHandler",
969            "CollectStatsHandler",
970            "FilterInactiveRegionStatsHandler",
971            "CollectLeaderRegionHandler",
972            "CollectTopicStatsHandler",
973            "CollectStatsHandler",
974            "RemapFlowPeerHandler",
975        ];
976        assert_eq!(names.len(), handlers.len());
977        for (handler, name) in handlers.iter().zip(names.into_iter()) {
978            assert_eq!(handler.name, name);
979        }
980    }
981
982    #[test]
983    fn test_handler_group_builder_add_before_first() {
984        let mut builder =
985            HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers();
986        builder
987            .add_handler_before("ResponseHeaderHandler", CollectStatsHandler::default())
988            .unwrap();
989
990        let group = builder.build().unwrap();
991        let handlers = group.handlers;
992        let names = [
993            "CollectStatsHandler",
994            "ResponseHeaderHandler",
995            "DatanodeKeepLeaseHandler",
996            "FlownodeKeepLeaseHandler",
997            "CheckLeaderHandler",
998            "OnLeaderStartHandler",
999            "ExtractStatHandler",
1000            "CollectDatanodeClusterInfoHandler",
1001            "CollectFrontendClusterInfoHandler",
1002            "CollectFlownodeClusterInfoHandler",
1003            "MailboxHandler",
1004            "FilterInactiveRegionStatsHandler",
1005            "CollectLeaderRegionHandler",
1006            "CollectTopicStatsHandler",
1007            "CollectStatsHandler",
1008            "RemapFlowPeerHandler",
1009        ];
1010        assert_eq!(names.len(), handlers.len());
1011        for (handler, name) in handlers.iter().zip(names.into_iter()) {
1012            assert_eq!(handler.name, name);
1013        }
1014    }
1015
1016    #[test]
1017    fn test_handler_group_builder_add_after() {
1018        let mut builder =
1019            HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers();
1020        builder
1021            .add_handler_after("MailboxHandler", CollectStatsHandler::default())
1022            .unwrap();
1023
1024        let group = builder.build().unwrap();
1025        let handlers = group.handlers;
1026        let names = [
1027            "ResponseHeaderHandler",
1028            "DatanodeKeepLeaseHandler",
1029            "FlownodeKeepLeaseHandler",
1030            "CheckLeaderHandler",
1031            "OnLeaderStartHandler",
1032            "ExtractStatHandler",
1033            "CollectDatanodeClusterInfoHandler",
1034            "CollectFrontendClusterInfoHandler",
1035            "CollectFlownodeClusterInfoHandler",
1036            "MailboxHandler",
1037            "CollectStatsHandler",
1038            "FilterInactiveRegionStatsHandler",
1039            "CollectLeaderRegionHandler",
1040            "CollectTopicStatsHandler",
1041            "CollectStatsHandler",
1042            "RemapFlowPeerHandler",
1043        ];
1044        assert_eq!(names.len(), handlers.len());
1045        for (handler, name) in handlers.iter().zip(names.into_iter()) {
1046            assert_eq!(handler.name, name);
1047        }
1048    }
1049
1050    #[test]
1051    fn test_handler_group_builder_add_after_last() {
1052        let mut builder =
1053            HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers();
1054        builder
1055            .add_handler_after("CollectStatsHandler", ResponseHeaderHandler)
1056            .unwrap();
1057
1058        let group = builder.build().unwrap();
1059        let handlers = group.handlers;
1060        let names = [
1061            "ResponseHeaderHandler",
1062            "DatanodeKeepLeaseHandler",
1063            "FlownodeKeepLeaseHandler",
1064            "CheckLeaderHandler",
1065            "OnLeaderStartHandler",
1066            "ExtractStatHandler",
1067            "CollectDatanodeClusterInfoHandler",
1068            "CollectFrontendClusterInfoHandler",
1069            "CollectFlownodeClusterInfoHandler",
1070            "MailboxHandler",
1071            "FilterInactiveRegionStatsHandler",
1072            "CollectLeaderRegionHandler",
1073            "CollectTopicStatsHandler",
1074            "CollectStatsHandler",
1075            "ResponseHeaderHandler",
1076            "RemapFlowPeerHandler",
1077        ];
1078        assert_eq!(names.len(), handlers.len());
1079        for (handler, name) in handlers.iter().zip(names.into_iter()) {
1080            assert_eq!(handler.name, name);
1081        }
1082    }
1083
1084    #[test]
1085    fn test_handler_group_builder_replace() {
1086        let mut builder =
1087            HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers();
1088        builder
1089            .replace_handler("MailboxHandler", CollectStatsHandler::default())
1090            .unwrap();
1091
1092        let group = builder.build().unwrap();
1093        let handlers = group.handlers;
1094        let names = [
1095            "ResponseHeaderHandler",
1096            "DatanodeKeepLeaseHandler",
1097            "FlownodeKeepLeaseHandler",
1098            "CheckLeaderHandler",
1099            "OnLeaderStartHandler",
1100            "ExtractStatHandler",
1101            "CollectDatanodeClusterInfoHandler",
1102            "CollectFrontendClusterInfoHandler",
1103            "CollectFlownodeClusterInfoHandler",
1104            "CollectStatsHandler",
1105            "FilterInactiveRegionStatsHandler",
1106            "CollectLeaderRegionHandler",
1107            "CollectTopicStatsHandler",
1108            "CollectStatsHandler",
1109            "RemapFlowPeerHandler",
1110        ];
1111
1112        assert_eq!(names.len(), handlers.len());
1113        for (handler, name) in handlers.iter().zip(names.into_iter()) {
1114            assert_eq!(handler.name, name);
1115        }
1116    }
1117
1118    #[test]
1119    fn test_handler_group_builder_replace_last() {
1120        let mut builder =
1121            HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers();
1122        builder
1123            .replace_handler("CollectStatsHandler", ResponseHeaderHandler)
1124            .unwrap();
1125
1126        let group = builder.build().unwrap();
1127        let handlers = group.handlers;
1128        let names = [
1129            "ResponseHeaderHandler",
1130            "DatanodeKeepLeaseHandler",
1131            "FlownodeKeepLeaseHandler",
1132            "CheckLeaderHandler",
1133            "OnLeaderStartHandler",
1134            "ExtractStatHandler",
1135            "CollectDatanodeClusterInfoHandler",
1136            "CollectFrontendClusterInfoHandler",
1137            "CollectFlownodeClusterInfoHandler",
1138            "MailboxHandler",
1139            "FilterInactiveRegionStatsHandler",
1140            "CollectLeaderRegionHandler",
1141            "CollectTopicStatsHandler",
1142            "ResponseHeaderHandler",
1143            "RemapFlowPeerHandler",
1144        ];
1145
1146        assert_eq!(names.len(), handlers.len());
1147        for (handler, name) in handlers.iter().zip(names.into_iter()) {
1148            assert_eq!(handler.name, name);
1149        }
1150    }
1151
1152    #[test]
1153    fn test_handler_group_builder_replace_first() {
1154        let mut builder =
1155            HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers();
1156        builder
1157            .replace_handler("ResponseHeaderHandler", CollectStatsHandler::default())
1158            .unwrap();
1159
1160        let group = builder.build().unwrap();
1161        let handlers = group.handlers;
1162        let names = [
1163            "CollectStatsHandler",
1164            "DatanodeKeepLeaseHandler",
1165            "FlownodeKeepLeaseHandler",
1166            "CheckLeaderHandler",
1167            "OnLeaderStartHandler",
1168            "ExtractStatHandler",
1169            "CollectDatanodeClusterInfoHandler",
1170            "CollectFrontendClusterInfoHandler",
1171            "CollectFlownodeClusterInfoHandler",
1172            "MailboxHandler",
1173            "FilterInactiveRegionStatsHandler",
1174            "CollectLeaderRegionHandler",
1175            "CollectTopicStatsHandler",
1176            "CollectStatsHandler",
1177            "RemapFlowPeerHandler",
1178        ];
1179        assert_eq!(names.len(), handlers.len());
1180        for (handler, name) in handlers.iter().zip(names.into_iter()) {
1181            assert_eq!(handler.name, name);
1182        }
1183    }
1184
1185    #[test]
1186    fn test_handler_group_builder_handler_not_found() {
1187        let mut builder =
1188            HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers();
1189        let err = builder
1190            .add_handler_before("NotExists", CollectStatsHandler::default())
1191            .unwrap_err();
1192        assert_matches!(err, error::Error::HandlerNotFound { .. });
1193
1194        let err = builder
1195            .add_handler_after("NotExists", CollectStatsHandler::default())
1196            .unwrap_err();
1197        assert_matches!(err, error::Error::HandlerNotFound { .. });
1198
1199        let err = builder
1200            .replace_handler("NotExists", CollectStatsHandler::default())
1201            .unwrap_err();
1202        assert_matches!(err, error::Error::HandlerNotFound { .. });
1203    }
1204
1205    #[tokio::test]
1206    async fn test_pusher_drop() {
1207        let (tx, _rx) = mpsc::channel(1);
1208        let pusher = Pusher::new(tx);
1209        let mut deregister_signal_tx = pusher.deregister_signal_receiver.clone();
1210
1211        drop(pusher);
1212        deregister_signal_tx.changed().await.unwrap();
1213    }
1214}