Skip to main content

meta_srv/
handler.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashSet};
16use std::fmt::{Debug, Display};
17use std::ops::Range;
18use std::sync::{Arc, Mutex};
19use std::time::{Duration, Instant};
20
21use api::v1::meta::mailbox_message::Payload;
22use api::v1::meta::{
23    HeartbeatRequest, HeartbeatResponse, MailboxMessage, PROTOCOL_VERSION, RegionLease,
24    ResponseHeader, Role,
25};
26use check_leader_handler::CheckLeaderHandler;
27use collect_cluster_info_handler::{
28    CollectDatanodeClusterInfoHandler, CollectFlownodeClusterInfoHandler,
29    CollectFrontendClusterInfoHandler,
30};
31use collect_leader_region_handler::CollectLeaderRegionHandler;
32use collect_stats_handler::CollectStatsHandler;
33use common_base::Plugins;
34use common_meta::datanode::Stat;
35use common_meta::instruction::InstructionReply;
36use common_meta::sequence::Sequence;
37use common_telemetry::{debug, info, warn};
38use dashmap::DashMap;
39use extract_stat_handler::ExtractStatHandler;
40use failure_handler::RegionFailureHandler;
41use filter_inactive_region_stats::FilterInactiveRegionStatsHandler;
42use futures::future::join_all;
43use keep_lease_handler::{DatanodeKeepLeaseHandler, FlownodeKeepLeaseHandler};
44use mailbox_handler::MailboxHandler;
45use on_leader_start_handler::OnLeaderStartHandler;
46use publish_heartbeat_handler::PublishHeartbeatHandler;
47use region_lease_handler::RegionLeaseHandler;
48use remap_flow_peer_handler::RemapFlowPeerHandler;
49use response_header_handler::ResponseHeaderHandler;
50use snafu::{OptionExt, ResultExt};
51use store_api::storage::RegionId;
52use tokio::sync::mpsc::Sender;
53use tokio::sync::{Notify, RwLock, oneshot, watch};
54
55use crate::error::{self, DeserializeFromJsonSnafu, Result, UnexpectedInstructionReplySnafu};
56use crate::handler::collect_topic_stats_handler::CollectTopicStatsHandler;
57use crate::handler::flow_state_handler::FlowStateHandler;
58use crate::handler::persist_stats_handler::PersistStatsHandler;
59use crate::metasrv::Context;
60use crate::metrics::{METRIC_META_HANDLER_EXECUTE, METRIC_META_HEARTBEAT_CONNECTION_NUM};
61use crate::pubsub::PublisherRef;
62use crate::service::mailbox::{
63    BroadcastChannel, Channel, Mailbox, MailboxReceiver, MailboxRef, MessageId,
64};
65
66pub mod check_leader_handler;
67pub mod collect_cluster_info_handler;
68pub mod collect_leader_region_handler;
69pub mod collect_stats_handler;
70pub mod collect_topic_stats_handler;
71pub mod extract_stat_handler;
72pub mod failure_handler;
73pub mod filter_inactive_region_stats;
74pub mod flow_state_handler;
75pub mod keep_lease_handler;
76pub mod mailbox_handler;
77pub mod on_leader_start_handler;
78pub mod persist_stats_handler;
79pub mod publish_heartbeat_handler;
80pub mod region_lease_handler;
81pub mod remap_flow_peer_handler;
82pub mod response_header_handler;
83
84#[cfg(test)]
85pub mod test_utils;
86
87#[async_trait::async_trait]
88pub trait HeartbeatHandler: Send + Sync {
89    fn is_acceptable(&self, role: Role) -> bool;
90
91    fn name(&self) -> &'static str {
92        let type_name = std::any::type_name::<Self>();
93        // short name
94        type_name.split("::").last().unwrap_or(type_name)
95    }
96
97    async fn handle(
98        &self,
99        req: &HeartbeatRequest,
100        ctx: &mut Context,
101        acc: &mut HeartbeatAccumulator,
102    ) -> Result<HandleControl>;
103}
104
105/// HandleControl
106///
107/// Controls process of handling heartbeat request.
108#[derive(PartialEq, Debug)]
109pub enum HandleControl {
110    Continue,
111    Done,
112}
113
114#[derive(Debug, Default)]
115pub struct HeartbeatAccumulator {
116    pub header: Option<ResponseHeader>,
117    mailbox_message: Option<MailboxMessage>,
118    pub stat: Option<Stat>,
119    pub inactive_region_ids: HashSet<RegionId>,
120    pub region_lease: Option<RegionLease>,
121}
122
123impl HeartbeatAccumulator {
124    pub(crate) fn take_mailbox_message(&mut self) -> Option<MailboxMessage> {
125        self.mailbox_message.take()
126    }
127
128    pub fn set_mailbox_message(&mut self, message: MailboxMessage) {
129        let _ = self.mailbox_message.insert(message);
130    }
131}
132
133#[derive(Copy, Clone)]
134pub struct PusherId {
135    pub role: Role,
136    pub id: u64,
137}
138
139impl Debug for PusherId {
140    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
141        write!(f, "{:?}-{}", self.role, self.id)
142    }
143}
144
145impl Display for PusherId {
146    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
147        write!(f, "{:?}-{}", self.role, self.id)
148    }
149}
150
151impl PusherId {
152    pub fn new(role: Role, id: u64) -> Self {
153        Self { role, id }
154    }
155
156    pub fn string_key(&self) -> String {
157        format!("{}-{}", self.role as i32, self.id)
158    }
159}
160
161/// The receiver of the deregister signal.
162pub type DeregisterSignalReceiver = watch::Receiver<bool>;
163
164/// The pusher of the heartbeat response.
165pub struct Pusher {
166    sender: Sender<std::result::Result<HeartbeatResponse, tonic::Status>>,
167    // The sender of the deregister signal.
168    // default is false, means the pusher is not deregistered.
169    // when the pusher is deregistered, the sender will be notified.
170    deregister_signal_sender: watch::Sender<bool>,
171    deregister_signal_receiver: DeregisterSignalReceiver,
172
173    res_header: ResponseHeader,
174}
175
176impl Drop for Pusher {
177    fn drop(&mut self) {
178        // Ignore the error here.
179        // if all the receivers have been dropped, means no body cares the deregister signal.
180        let _ = self.deregister_signal_sender.send(true);
181    }
182}
183
184impl Pusher {
185    pub fn new(sender: Sender<std::result::Result<HeartbeatResponse, tonic::Status>>) -> Self {
186        let res_header = ResponseHeader {
187            protocol_version: PROTOCOL_VERSION,
188            ..Default::default()
189        };
190        let (deregister_signal_sender, deregister_signal_receiver) = watch::channel(false);
191        Self {
192            sender,
193            deregister_signal_sender,
194            deregister_signal_receiver,
195            res_header,
196        }
197    }
198
199    #[inline]
200    pub async fn push(&self, res: HeartbeatResponse) -> Result<()> {
201        self.sender.send(Ok(res)).await.map_err(|e| {
202            error::PushMessageSnafu {
203                err_msg: e.to_string(),
204            }
205            .build()
206        })
207    }
208
209    #[inline]
210    pub fn header(&self) -> ResponseHeader {
211        self.res_header.clone()
212    }
213}
214
215/// The group of heartbeat pushers.
216#[derive(Clone, Default)]
217pub struct Pushers(Arc<RwLock<BTreeMap<String, Pusher>>>);
218
219impl Pushers {
220    async fn push(
221        &self,
222        pusher_id: PusherId,
223        mailbox_message: MailboxMessage,
224    ) -> Result<DeregisterSignalReceiver> {
225        let pusher_id = pusher_id.string_key();
226        let pushers = self.0.read().await;
227        let pusher = pushers
228            .get(&pusher_id)
229            .context(error::PusherNotFoundSnafu { pusher_id })?;
230
231        pusher
232            .push(HeartbeatResponse {
233                header: Some(pusher.header()),
234                mailbox_message: Some(mailbox_message),
235                ..Default::default()
236            })
237            .await?;
238
239        Ok(pusher.deregister_signal_receiver.clone())
240    }
241
242    async fn broadcast(
243        &self,
244        range: Range<String>,
245        mailbox_message: &MailboxMessage,
246    ) -> Result<()> {
247        let pushers = self.0.read().await;
248        let pushers = pushers
249            .range(range)
250            .map(|(_, value)| value)
251            .collect::<Vec<_>>();
252        let mut results = Vec::with_capacity(pushers.len());
253
254        for pusher in pushers {
255            let mut mailbox_message = mailbox_message.clone();
256            mailbox_message.id = 0; // one-way message
257
258            results.push(pusher.push(HeartbeatResponse {
259                header: Some(pusher.header()),
260                mailbox_message: Some(mailbox_message),
261                ..Default::default()
262            }))
263        }
264
265        // Checks the error out of the loop.
266        let _ = join_all(results)
267            .await
268            .into_iter()
269            .collect::<Result<Vec<_>>>()?;
270
271        Ok(())
272    }
273
274    pub(crate) async fn insert(&self, pusher_id: String, pusher: Pusher) -> Option<Pusher> {
275        self.0.write().await.insert(pusher_id, pusher)
276    }
277
278    async fn remove(&self, pusher_id: &str) -> Option<Pusher> {
279        self.0.write().await.remove(pusher_id)
280    }
281}
282
283#[derive(Clone)]
284pub struct NameCachedHandler {
285    name: &'static str,
286    handler: Arc<dyn HeartbeatHandler>,
287}
288
289impl NameCachedHandler {
290    fn new(handler: impl HeartbeatHandler + 'static) -> Self {
291        let name = handler.name();
292        let handler = Arc::new(handler);
293        Self { name, handler }
294    }
295}
296
297pub type HeartbeatHandlerGroupRef = Arc<HeartbeatHandlerGroup>;
298
299/// The group of heartbeat handlers.
300#[derive(Default, Clone)]
301pub struct HeartbeatHandlerGroup {
302    handlers: Vec<NameCachedHandler>,
303    pushers: Pushers,
304}
305
306impl HeartbeatHandlerGroup {
307    /// Registers the heartbeat response [`Pusher`] with the given key to the group.
308    pub async fn register_pusher(&self, pusher_id: PusherId, pusher: Pusher) {
309        METRIC_META_HEARTBEAT_CONNECTION_NUM.inc();
310        info!("Pusher register: {}", pusher_id);
311        let _ = self.pushers.insert(pusher_id.string_key(), pusher).await;
312    }
313
314    /// Deregisters the heartbeat response [`Pusher`] with the given key from the group.
315    pub async fn deregister_push(&self, pusher_id: PusherId) {
316        if self.pushers.remove(&pusher_id.string_key()).await.is_some() {
317            info!("Pusher unregister: {}", pusher_id);
318            METRIC_META_HEARTBEAT_CONNECTION_NUM.dec();
319        }
320    }
321
322    #[cfg(test)]
323    /// Returns whether the group contains the heartbeat response [`Pusher`] with the given key.
324    pub async fn contains_pusher(&self, pusher_id: &PusherId) -> bool {
325        let pushers = self.pushers.0.read().await;
326        pushers.contains_key(&pusher_id.string_key())
327    }
328
329    /// Returns the [`Pushers`] of the group.
330    pub fn pushers(&self) -> Pushers {
331        self.pushers.clone()
332    }
333
334    /// Handles the heartbeat request.
335    pub async fn handle(
336        &self,
337        req: HeartbeatRequest,
338        mut ctx: Context,
339    ) -> Result<HeartbeatResponse> {
340        let mut acc = HeartbeatAccumulator::default();
341        let role = req
342            .header
343            .as_ref()
344            .and_then(|h| Role::try_from(h.role).ok())
345            .context(error::InvalidArgumentsSnafu {
346                err_msg: format!("invalid role: {:?}", req.header),
347            })?;
348
349        let is_handshake = ctx.is_handshake;
350
351        for NameCachedHandler { name, handler } in self.handlers.iter() {
352            if !handler.is_acceptable(role) {
353                continue;
354            }
355
356            let _timer = METRIC_META_HANDLER_EXECUTE
357                .with_label_values(&[*name])
358                .start_timer();
359
360            if handler.handle(&req, &mut ctx, &mut acc).await? == HandleControl::Done {
361                break;
362            }
363        }
364        let header = std::mem::take(&mut acc.header);
365        let mailbox_message = acc.take_mailbox_message();
366
367        // Populate heartbeat_config during handshake
368        let heartbeat_config = if is_handshake {
369            let config = ctx.heartbeat_options_for(role).into();
370
371            info!(
372                "Handshake with {:?} node, sending config: {:?}",
373                role, config
374            );
375
376            Some(config)
377        } else {
378            None
379        };
380
381        let res = HeartbeatResponse {
382            header,
383            region_lease: acc.region_lease,
384            mailbox_message,
385            heartbeat_config,
386        };
387        Ok(res)
388    }
389}
390
391pub struct HeartbeatMailbox {
392    pushers: Pushers,
393    sequence: Sequence,
394    senders: DashMap<MessageId, oneshot::Sender<Result<MailboxMessage>>>,
395    timeouts: DashMap<MessageId, Instant>,
396    timeout_notify: Notify,
397}
398
399impl HeartbeatMailbox {
400    pub fn json_reply(msg: &MailboxMessage) -> Result<InstructionReply> {
401        let Payload::Json(payload) =
402            msg.payload
403                .as_ref()
404                .with_context(|| UnexpectedInstructionReplySnafu {
405                    mailbox_message: msg.to_string(),
406                    reason: format!("empty payload, msg: {msg:?}"),
407                })?;
408        serde_json::from_str(payload).context(DeserializeFromJsonSnafu { input: payload })
409    }
410
411    /// Parses the [Instruction] from [MailboxMessage].
412    #[cfg(test)]
413    pub(crate) fn json_instruction(
414        msg: &MailboxMessage,
415    ) -> Result<common_meta::instruction::Instruction> {
416        let Payload::Json(payload) =
417            msg.payload
418                .as_ref()
419                .with_context(|| UnexpectedInstructionReplySnafu {
420                    mailbox_message: msg.to_string(),
421                    reason: format!("empty payload, msg: {msg:?}"),
422                })?;
423        serde_json::from_str(payload).context(DeserializeFromJsonSnafu { input: payload })
424    }
425
426    pub fn create(pushers: Pushers, sequence: Sequence) -> MailboxRef {
427        let mailbox = Arc::new(Self::new(pushers, sequence));
428
429        let timeout_checker = mailbox.clone();
430        let _handle = common_runtime::spawn_global(async move {
431            timeout_checker.check_timeout_bg(10).await;
432        });
433
434        mailbox
435    }
436
437    fn new(pushers: Pushers, sequence: Sequence) -> Self {
438        Self {
439            pushers,
440            sequence,
441            senders: DashMap::default(),
442            timeouts: DashMap::default(),
443            timeout_notify: Notify::new(),
444        }
445    }
446
447    async fn check_timeout_bg(&self, interval_millis: u64) {
448        let mut interval = tokio::time::interval(Duration::from_millis(interval_millis));
449
450        loop {
451            let _ = interval.tick().await;
452
453            if self.timeouts.is_empty() {
454                self.timeout_notify.notified().await;
455            }
456
457            let now = Instant::now();
458            let timeout_ids = self
459                .timeouts
460                .iter()
461                .filter_map(|entry| {
462                    let (id, deadline) = entry.pair();
463                    if deadline < &now { Some(*id) } else { None }
464                })
465                .collect::<Vec<_>>();
466
467            for id in timeout_ids {
468                let _ = self
469                    .on_recv(id, Err(error::MailboxTimeoutSnafu { id }.build()))
470                    .await;
471            }
472        }
473    }
474
475    #[inline]
476    async fn next_message_id(&self) -> Result<u64> {
477        // In this implementation, we pre-occupy the message_id of 0,
478        // and we use `message_id = 0` to mark a Message as a one-way call.
479        loop {
480            let next = self
481                .sequence
482                .next()
483                .await
484                .context(error::NextSequenceSnafu)?;
485            if next > 0 {
486                return Ok(next);
487            }
488        }
489    }
490}
491
492#[async_trait::async_trait]
493impl Mailbox for HeartbeatMailbox {
494    async fn send(
495        &self,
496        ch: &Channel,
497        mut msg: MailboxMessage,
498        timeout: Duration,
499    ) -> Result<MailboxReceiver> {
500        let message_id = self.next_message_id().await?;
501        msg.id = message_id;
502
503        let pusher_id = ch.pusher_id();
504        debug!("Sending mailbox message {msg:?} to {pusher_id}");
505
506        let (tx, rx) = oneshot::channel();
507        let _ = self.senders.insert(message_id, tx);
508        let deadline = Instant::now() + timeout;
509        self.timeouts.insert(message_id, deadline);
510        self.timeout_notify.notify_one();
511        let deregister_signal_receiver = self.pushers.push(pusher_id, msg).await?;
512
513        Ok(MailboxReceiver::new(
514            message_id,
515            rx,
516            deregister_signal_receiver,
517            *ch,
518        ))
519    }
520
521    async fn send_oneway(&self, ch: &Channel, mut msg: MailboxMessage) -> Result<()> {
522        let message_id = 0; // one-way message, same as `broadcast`
523        msg.id = message_id;
524
525        let pusher_id = ch.pusher_id();
526        debug!("Sending mailbox message {msg:?} to {pusher_id}");
527
528        self.pushers.push(pusher_id, msg).await?;
529
530        Ok(())
531    }
532
533    async fn broadcast(&self, ch: &BroadcastChannel, msg: &MailboxMessage) -> Result<()> {
534        self.pushers.broadcast(ch.pusher_range(), msg).await
535    }
536
537    async fn on_recv(&self, id: MessageId, maybe_msg: Result<MailboxMessage>) -> Result<()> {
538        debug!("Received mailbox message {maybe_msg:?}");
539
540        let _ = self.timeouts.remove(&id);
541
542        if let Some((_, tx)) = self.senders.remove(&id) {
543            tx.send(maybe_msg)
544                .map_err(|_| error::MailboxClosedSnafu { id }.build())?;
545        } else if let Ok(finally_msg) = maybe_msg {
546            warn!("The response arrived too late: {finally_msg:?}");
547        }
548
549        Ok(())
550    }
551}
552
553/// The builder to build the group of heartbeat handlers.
554pub struct HeartbeatHandlerGroupBuilder {
555    /// The handler to handle region failure.
556    region_failure_handler: Option<RegionFailureHandler>,
557
558    /// The handler to handle region lease.
559    region_lease_handler: Option<RegionLeaseHandler>,
560
561    /// The factor that determines how often statistics should be flushed,
562    /// based on the number of received heartbeats. When the number of heartbeats
563    /// reaches this factor, a flush operation is triggered.
564    flush_stats_factor: Option<usize>,
565    /// A simple handler for flow internal state report
566    flow_state_handler: Option<FlowStateHandler>,
567
568    /// The handler to persist stats.
569    persist_stats_handler: Option<PersistStatsHandler>,
570
571    /// The plugins.
572    plugins: Option<Plugins>,
573
574    /// The heartbeat response pushers.
575    pushers: Pushers,
576
577    /// The group of heartbeat handlers.
578    handlers: Vec<NameCachedHandler>,
579}
580
581impl HeartbeatHandlerGroupBuilder {
582    pub fn new(pushers: Pushers) -> Self {
583        Self {
584            region_failure_handler: None,
585            region_lease_handler: None,
586            flush_stats_factor: None,
587            flow_state_handler: None,
588            persist_stats_handler: None,
589            plugins: None,
590            pushers,
591            handlers: vec![],
592        }
593    }
594
595    pub fn with_flow_state_handler(mut self, handler: Option<FlowStateHandler>) -> Self {
596        self.flow_state_handler = handler;
597        self
598    }
599
600    pub fn with_region_lease_handler(mut self, handler: Option<RegionLeaseHandler>) -> Self {
601        self.region_lease_handler = handler;
602        self
603    }
604
605    /// Sets the [`RegionFailureHandler`].
606    pub fn with_region_failure_handler(mut self, handler: Option<RegionFailureHandler>) -> Self {
607        self.region_failure_handler = handler;
608        self
609    }
610
611    /// Sets the flush stats factor.
612    pub fn with_flush_stats_factor(mut self, flush_stats_factor: Option<usize>) -> Self {
613        self.flush_stats_factor = flush_stats_factor;
614        self
615    }
616
617    pub fn with_persist_stats_handler(mut self, handler: Option<PersistStatsHandler>) -> Self {
618        self.persist_stats_handler = handler;
619        self
620    }
621
622    /// Sets the [`Plugins`].
623    pub fn with_plugins(mut self, plugins: Option<Plugins>) -> Self {
624        self.plugins = plugins;
625        self
626    }
627
628    /// Adds the default handlers.
629    pub fn add_default_handlers(mut self) -> Self {
630        // Extract the `PublishHeartbeatHandler` from the plugins.
631        let publish_heartbeat_handler = if let Some(plugins) = self.plugins.as_ref() {
632            plugins
633                .get::<PublisherRef>()
634                .map(|publish| PublishHeartbeatHandler::new(publish.clone()))
635        } else {
636            None
637        };
638
639        self.add_handler_last(ResponseHeaderHandler);
640        // `KeepLeaseHandler` should preferably be in front of `CheckLeaderHandler`,
641        // because even if the current meta-server node is no longer the leader it can
642        // still help the datanode to keep lease.
643        self.add_handler_last(DatanodeKeepLeaseHandler);
644        self.add_handler_last(FlownodeKeepLeaseHandler);
645        self.add_handler_last(CheckLeaderHandler);
646        self.add_handler_last(OnLeaderStartHandler);
647        self.add_handler_last(ExtractStatHandler);
648        self.add_handler_last(CollectDatanodeClusterInfoHandler);
649        self.add_handler_last(CollectFrontendClusterInfoHandler);
650        self.add_handler_last(CollectFlownodeClusterInfoHandler);
651        self.add_handler_last(MailboxHandler);
652        if let Some(region_lease_handler) = self.region_lease_handler.take() {
653            self.add_handler_last(region_lease_handler);
654        }
655        self.add_handler_last(FilterInactiveRegionStatsHandler);
656        if let Some(region_failure_handler) = self.region_failure_handler.take() {
657            self.add_handler_last(region_failure_handler);
658        }
659        if let Some(publish_heartbeat_handler) = publish_heartbeat_handler {
660            self.add_handler_last(publish_heartbeat_handler);
661        }
662        self.add_handler_last(CollectLeaderRegionHandler);
663        self.add_handler_last(CollectTopicStatsHandler);
664        // Persist stats handler should be in front of collect stats handler.
665        // Because collect stats handler will consume the stats from the accumulator.
666        if let Some(persist_stats_handler) = self.persist_stats_handler.take() {
667            self.add_handler_last(persist_stats_handler);
668        }
669        self.add_handler_last(CollectStatsHandler::new(self.flush_stats_factor));
670        self.add_handler_last(RemapFlowPeerHandler::default());
671
672        if let Some(flow_state_handler) = self.flow_state_handler.take() {
673            self.add_handler_last(flow_state_handler);
674        }
675
676        self
677    }
678
679    /// Builds the group of heartbeat handlers.
680    ///
681    /// Applies the customizer if it exists.
682    pub fn build(mut self) -> Result<HeartbeatHandlerGroup> {
683        if let Some(customizer) = self
684            .plugins
685            .as_ref()
686            .and_then(|plugins| plugins.get::<HeartbeatHandlerGroupBuilderCustomizerRef>())
687        {
688            debug!("Customizing the heartbeat handler group builder");
689            customizer.customize(&mut self)?;
690        }
691
692        Ok(HeartbeatHandlerGroup {
693            handlers: self.handlers,
694            pushers: self.pushers,
695        })
696    }
697
698    fn add_handler_after_inner(&mut self, target: &str, handler: NameCachedHandler) -> Result<()> {
699        if let Some(pos) = self.handlers.iter().position(|x| x.name == target) {
700            self.handlers.insert(pos + 1, handler);
701            return Ok(());
702        }
703
704        error::HandlerNotFoundSnafu { name: target }.fail()
705    }
706
707    /// Adds the handler after the specified handler.
708    pub fn add_handler_after(
709        &mut self,
710        target: &'static str,
711        handler: impl HeartbeatHandler + 'static,
712    ) -> Result<()> {
713        self.add_handler_after_inner(target, NameCachedHandler::new(handler))
714    }
715
716    fn add_handler_before_inner(&mut self, target: &str, handler: NameCachedHandler) -> Result<()> {
717        if let Some(pos) = self.handlers.iter().position(|x| x.name == target) {
718            self.handlers.insert(pos, handler);
719            return Ok(());
720        }
721
722        error::HandlerNotFoundSnafu { name: target }.fail()
723    }
724
725    /// Adds the handler before the specified handler.
726    pub fn add_handler_before(
727        &mut self,
728        target: &'static str,
729        handler: impl HeartbeatHandler + 'static,
730    ) -> Result<()> {
731        self.add_handler_before_inner(target, NameCachedHandler::new(handler))
732    }
733
734    fn replace_handler_inner(&mut self, target: &str, handler: NameCachedHandler) -> Result<()> {
735        if let Some(pos) = self.handlers.iter().position(|x| x.name == target) {
736            self.handlers[pos] = handler;
737            return Ok(());
738        }
739
740        error::HandlerNotFoundSnafu { name: target }.fail()
741    }
742
743    /// Replaces the handler with the specified name.
744    pub fn replace_handler(
745        &mut self,
746        target: &'static str,
747        handler: impl HeartbeatHandler + 'static,
748    ) -> Result<()> {
749        self.replace_handler_inner(target, NameCachedHandler::new(handler))
750    }
751
752    fn add_handler_last_inner(&mut self, handler: NameCachedHandler) {
753        self.handlers.push(handler);
754    }
755
756    fn add_handler_last(&mut self, handler: impl HeartbeatHandler + 'static) {
757        self.add_handler_last_inner(NameCachedHandler::new(handler));
758    }
759}
760
761pub type HeartbeatHandlerGroupBuilderCustomizerRef =
762    Arc<dyn HeartbeatHandlerGroupBuilderCustomizer>;
763
764pub enum CustomizeHeartbeatGroupAction {
765    AddHandlerAfter {
766        target: String,
767        handler: NameCachedHandler,
768    },
769    AddHandlerBefore {
770        target: String,
771        handler: NameCachedHandler,
772    },
773    ReplaceHandler {
774        target: String,
775        handler: NameCachedHandler,
776    },
777    AddHandlerLast {
778        handler: NameCachedHandler,
779    },
780}
781
782impl CustomizeHeartbeatGroupAction {
783    pub fn new_add_handler_after(
784        target: &'static str,
785        handler: impl HeartbeatHandler + 'static,
786    ) -> Self {
787        Self::AddHandlerAfter {
788            target: target.to_string(),
789            handler: NameCachedHandler::new(handler),
790        }
791    }
792
793    pub fn new_add_handler_before(
794        target: &'static str,
795        handler: impl HeartbeatHandler + 'static,
796    ) -> Self {
797        Self::AddHandlerBefore {
798            target: target.to_string(),
799            handler: NameCachedHandler::new(handler),
800        }
801    }
802
803    pub fn new_replace_handler(
804        target: &'static str,
805        handler: impl HeartbeatHandler + 'static,
806    ) -> Self {
807        Self::ReplaceHandler {
808            target: target.to_string(),
809            handler: NameCachedHandler::new(handler),
810        }
811    }
812
813    pub fn new_add_handler_last(handler: impl HeartbeatHandler + 'static) -> Self {
814        Self::AddHandlerLast {
815            handler: NameCachedHandler::new(handler),
816        }
817    }
818}
819
820/// The customizer of the [`HeartbeatHandlerGroupBuilder`].
821pub trait HeartbeatHandlerGroupBuilderCustomizer: Send + Sync {
822    fn customize(&self, builder: &mut HeartbeatHandlerGroupBuilder) -> Result<()>;
823
824    fn add_action(&self, action: CustomizeHeartbeatGroupAction);
825}
826
827#[derive(Default)]
828pub struct DefaultHeartbeatHandlerGroupBuilderCustomizer {
829    actions: Mutex<Vec<CustomizeHeartbeatGroupAction>>,
830}
831
832impl HeartbeatHandlerGroupBuilderCustomizer for DefaultHeartbeatHandlerGroupBuilderCustomizer {
833    fn customize(&self, builder: &mut HeartbeatHandlerGroupBuilder) -> Result<()> {
834        info!("Customizing the heartbeat handler group builder");
835        let mut actions = self.actions.lock().unwrap();
836        for action in actions.drain(..) {
837            match action {
838                CustomizeHeartbeatGroupAction::AddHandlerAfter { target, handler } => {
839                    builder.add_handler_after_inner(&target, handler)?;
840                }
841                CustomizeHeartbeatGroupAction::AddHandlerBefore { target, handler } => {
842                    builder.add_handler_before_inner(&target, handler)?;
843                }
844                CustomizeHeartbeatGroupAction::ReplaceHandler { target, handler } => {
845                    builder.replace_handler_inner(&target, handler)?;
846                }
847                CustomizeHeartbeatGroupAction::AddHandlerLast { handler } => {
848                    builder.add_handler_last_inner(handler);
849                }
850            }
851        }
852        Ok(())
853    }
854
855    fn add_action(&self, action: CustomizeHeartbeatGroupAction) {
856        self.actions.lock().unwrap().push(action);
857    }
858}
859
860#[cfg(test)]
861mod tests {
862
863    use std::assert_matches;
864    use std::sync::Arc;
865    use std::time::Duration;
866
867    use api::v1::meta::{MailboxMessage, Role};
868    use common_meta::kv_backend::memory::MemoryKvBackend;
869    use common_meta::sequence::SequenceBuilder;
870    use tokio::sync::mpsc;
871
872    use super::{HeartbeatHandlerGroupBuilder, PusherId, Pushers};
873    use crate::error;
874    use crate::handler::collect_stats_handler::CollectStatsHandler;
875    use crate::handler::response_header_handler::ResponseHeaderHandler;
876    use crate::handler::{HeartbeatHandlerGroup, HeartbeatMailbox, Pusher};
877    use crate::service::mailbox::{Channel, MailboxReceiver, MailboxRef};
878
879    #[tokio::test]
880    async fn test_mailbox() {
881        let (mailbox, receiver) = push_msg_via_mailbox().await;
882        let id = receiver.message_id();
883
884        let resp_msg = MailboxMessage {
885            id,
886            subject: "resp-test".to_string(),
887            timestamp_millis: 456,
888            ..Default::default()
889        };
890
891        mailbox.on_recv(id, Ok(resp_msg)).await.unwrap();
892
893        let recv_msg = receiver.await.unwrap();
894        assert_eq!(recv_msg.id, id);
895        assert_eq!(recv_msg.timestamp_millis, 456);
896        assert_eq!(recv_msg.subject, "resp-test".to_string());
897    }
898
899    #[tokio::test]
900    async fn test_mailbox_timeout() {
901        let (_, receiver) = push_msg_via_mailbox().await;
902        let res = receiver.await;
903        assert!(res.is_err());
904    }
905
906    async fn push_msg_via_mailbox() -> (MailboxRef, MailboxReceiver) {
907        let datanode_id = 12;
908        let (pusher_tx, mut pusher_rx) = mpsc::channel(16);
909        let pusher_id = PusherId::new(Role::Datanode, datanode_id);
910        let pusher: Pusher = Pusher::new(pusher_tx);
911        let handler_group = HeartbeatHandlerGroup::default();
912        handler_group.register_pusher(pusher_id, pusher).await;
913
914        let kv_backend = Arc::new(MemoryKvBackend::new());
915        let seq = SequenceBuilder::new("test_seq", kv_backend).build();
916        let mailbox = HeartbeatMailbox::create(handler_group.pushers(), seq);
917
918        let msg = MailboxMessage {
919            id: 0,
920            subject: "req-test".to_string(),
921            timestamp_millis: 123,
922            ..Default::default()
923        };
924        let ch = Channel::Datanode(datanode_id);
925
926        let receiver = mailbox
927            .send(&ch, msg, Duration::from_secs(1))
928            .await
929            .unwrap();
930
931        let recv_obj = pusher_rx.recv().await.unwrap().unwrap();
932        let message = recv_obj.mailbox_message.unwrap();
933        assert_eq!(message.timestamp_millis, 123);
934        assert_eq!(message.subject, "req-test".to_string());
935
936        (mailbox, receiver)
937    }
938
939    #[test]
940    fn test_handler_group_builder() {
941        let group = HeartbeatHandlerGroupBuilder::new(Pushers::default())
942            .add_default_handlers()
943            .build()
944            .unwrap();
945
946        let handlers = group.handlers;
947        let names = [
948            "ResponseHeaderHandler",
949            "DatanodeKeepLeaseHandler",
950            "FlownodeKeepLeaseHandler",
951            "CheckLeaderHandler",
952            "OnLeaderStartHandler",
953            "ExtractStatHandler",
954            "CollectDatanodeClusterInfoHandler",
955            "CollectFrontendClusterInfoHandler",
956            "CollectFlownodeClusterInfoHandler",
957            "MailboxHandler",
958            "FilterInactiveRegionStatsHandler",
959            "CollectLeaderRegionHandler",
960            "CollectTopicStatsHandler",
961            "CollectStatsHandler",
962            "RemapFlowPeerHandler",
963        ];
964        assert_eq!(names.len(), handlers.len());
965        for (handler, name) in handlers.iter().zip(names) {
966            assert_eq!(handler.name, name);
967        }
968    }
969
970    #[test]
971    fn test_handler_group_builder_add_before() {
972        let mut builder =
973            HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers();
974        builder
975            .add_handler_before(
976                "FilterInactiveRegionStatsHandler",
977                CollectStatsHandler::default(),
978            )
979            .unwrap();
980
981        let group = builder.build().unwrap();
982        let handlers = group.handlers;
983        let names = [
984            "ResponseHeaderHandler",
985            "DatanodeKeepLeaseHandler",
986            "FlownodeKeepLeaseHandler",
987            "CheckLeaderHandler",
988            "OnLeaderStartHandler",
989            "ExtractStatHandler",
990            "CollectDatanodeClusterInfoHandler",
991            "CollectFrontendClusterInfoHandler",
992            "CollectFlownodeClusterInfoHandler",
993            "MailboxHandler",
994            "CollectStatsHandler",
995            "FilterInactiveRegionStatsHandler",
996            "CollectLeaderRegionHandler",
997            "CollectTopicStatsHandler",
998            "CollectStatsHandler",
999            "RemapFlowPeerHandler",
1000        ];
1001        assert_eq!(names.len(), handlers.len());
1002        for (handler, name) in handlers.iter().zip(names) {
1003            assert_eq!(handler.name, name);
1004        }
1005    }
1006
1007    #[test]
1008    fn test_handler_group_builder_add_before_first() {
1009        let mut builder =
1010            HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers();
1011        builder
1012            .add_handler_before("ResponseHeaderHandler", CollectStatsHandler::default())
1013            .unwrap();
1014
1015        let group = builder.build().unwrap();
1016        let handlers = group.handlers;
1017        let names = [
1018            "CollectStatsHandler",
1019            "ResponseHeaderHandler",
1020            "DatanodeKeepLeaseHandler",
1021            "FlownodeKeepLeaseHandler",
1022            "CheckLeaderHandler",
1023            "OnLeaderStartHandler",
1024            "ExtractStatHandler",
1025            "CollectDatanodeClusterInfoHandler",
1026            "CollectFrontendClusterInfoHandler",
1027            "CollectFlownodeClusterInfoHandler",
1028            "MailboxHandler",
1029            "FilterInactiveRegionStatsHandler",
1030            "CollectLeaderRegionHandler",
1031            "CollectTopicStatsHandler",
1032            "CollectStatsHandler",
1033            "RemapFlowPeerHandler",
1034        ];
1035        assert_eq!(names.len(), handlers.len());
1036        for (handler, name) in handlers.iter().zip(names) {
1037            assert_eq!(handler.name, name);
1038        }
1039    }
1040
1041    #[test]
1042    fn test_handler_group_builder_add_after() {
1043        let mut builder =
1044            HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers();
1045        builder
1046            .add_handler_after("MailboxHandler", CollectStatsHandler::default())
1047            .unwrap();
1048
1049        let group = builder.build().unwrap();
1050        let handlers = group.handlers;
1051        let names = [
1052            "ResponseHeaderHandler",
1053            "DatanodeKeepLeaseHandler",
1054            "FlownodeKeepLeaseHandler",
1055            "CheckLeaderHandler",
1056            "OnLeaderStartHandler",
1057            "ExtractStatHandler",
1058            "CollectDatanodeClusterInfoHandler",
1059            "CollectFrontendClusterInfoHandler",
1060            "CollectFlownodeClusterInfoHandler",
1061            "MailboxHandler",
1062            "CollectStatsHandler",
1063            "FilterInactiveRegionStatsHandler",
1064            "CollectLeaderRegionHandler",
1065            "CollectTopicStatsHandler",
1066            "CollectStatsHandler",
1067            "RemapFlowPeerHandler",
1068        ];
1069        assert_eq!(names.len(), handlers.len());
1070        for (handler, name) in handlers.iter().zip(names) {
1071            assert_eq!(handler.name, name);
1072        }
1073    }
1074
1075    #[test]
1076    fn test_handler_group_builder_add_after_last() {
1077        let mut builder =
1078            HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers();
1079        builder
1080            .add_handler_after("CollectStatsHandler", ResponseHeaderHandler)
1081            .unwrap();
1082
1083        let group = builder.build().unwrap();
1084        let handlers = group.handlers;
1085        let names = [
1086            "ResponseHeaderHandler",
1087            "DatanodeKeepLeaseHandler",
1088            "FlownodeKeepLeaseHandler",
1089            "CheckLeaderHandler",
1090            "OnLeaderStartHandler",
1091            "ExtractStatHandler",
1092            "CollectDatanodeClusterInfoHandler",
1093            "CollectFrontendClusterInfoHandler",
1094            "CollectFlownodeClusterInfoHandler",
1095            "MailboxHandler",
1096            "FilterInactiveRegionStatsHandler",
1097            "CollectLeaderRegionHandler",
1098            "CollectTopicStatsHandler",
1099            "CollectStatsHandler",
1100            "ResponseHeaderHandler",
1101            "RemapFlowPeerHandler",
1102        ];
1103        assert_eq!(names.len(), handlers.len());
1104        for (handler, name) in handlers.iter().zip(names) {
1105            assert_eq!(handler.name, name);
1106        }
1107    }
1108
1109    #[test]
1110    fn test_handler_group_builder_replace() {
1111        let mut builder =
1112            HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers();
1113        builder
1114            .replace_handler("MailboxHandler", CollectStatsHandler::default())
1115            .unwrap();
1116
1117        let group = builder.build().unwrap();
1118        let handlers = group.handlers;
1119        let names = [
1120            "ResponseHeaderHandler",
1121            "DatanodeKeepLeaseHandler",
1122            "FlownodeKeepLeaseHandler",
1123            "CheckLeaderHandler",
1124            "OnLeaderStartHandler",
1125            "ExtractStatHandler",
1126            "CollectDatanodeClusterInfoHandler",
1127            "CollectFrontendClusterInfoHandler",
1128            "CollectFlownodeClusterInfoHandler",
1129            "CollectStatsHandler",
1130            "FilterInactiveRegionStatsHandler",
1131            "CollectLeaderRegionHandler",
1132            "CollectTopicStatsHandler",
1133            "CollectStatsHandler",
1134            "RemapFlowPeerHandler",
1135        ];
1136
1137        assert_eq!(names.len(), handlers.len());
1138        for (handler, name) in handlers.iter().zip(names) {
1139            assert_eq!(handler.name, name);
1140        }
1141    }
1142
1143    #[test]
1144    fn test_handler_group_builder_replace_last() {
1145        let mut builder =
1146            HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers();
1147        builder
1148            .replace_handler("CollectStatsHandler", ResponseHeaderHandler)
1149            .unwrap();
1150
1151        let group = builder.build().unwrap();
1152        let handlers = group.handlers;
1153        let names = [
1154            "ResponseHeaderHandler",
1155            "DatanodeKeepLeaseHandler",
1156            "FlownodeKeepLeaseHandler",
1157            "CheckLeaderHandler",
1158            "OnLeaderStartHandler",
1159            "ExtractStatHandler",
1160            "CollectDatanodeClusterInfoHandler",
1161            "CollectFrontendClusterInfoHandler",
1162            "CollectFlownodeClusterInfoHandler",
1163            "MailboxHandler",
1164            "FilterInactiveRegionStatsHandler",
1165            "CollectLeaderRegionHandler",
1166            "CollectTopicStatsHandler",
1167            "ResponseHeaderHandler",
1168            "RemapFlowPeerHandler",
1169        ];
1170
1171        assert_eq!(names.len(), handlers.len());
1172        for (handler, name) in handlers.iter().zip(names) {
1173            assert_eq!(handler.name, name);
1174        }
1175    }
1176
1177    #[test]
1178    fn test_handler_group_builder_replace_first() {
1179        let mut builder =
1180            HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers();
1181        builder
1182            .replace_handler("ResponseHeaderHandler", CollectStatsHandler::default())
1183            .unwrap();
1184
1185        let group = builder.build().unwrap();
1186        let handlers = group.handlers;
1187        let names = [
1188            "CollectStatsHandler",
1189            "DatanodeKeepLeaseHandler",
1190            "FlownodeKeepLeaseHandler",
1191            "CheckLeaderHandler",
1192            "OnLeaderStartHandler",
1193            "ExtractStatHandler",
1194            "CollectDatanodeClusterInfoHandler",
1195            "CollectFrontendClusterInfoHandler",
1196            "CollectFlownodeClusterInfoHandler",
1197            "MailboxHandler",
1198            "FilterInactiveRegionStatsHandler",
1199            "CollectLeaderRegionHandler",
1200            "CollectTopicStatsHandler",
1201            "CollectStatsHandler",
1202            "RemapFlowPeerHandler",
1203        ];
1204        assert_eq!(names.len(), handlers.len());
1205        for (handler, name) in handlers.iter().zip(names) {
1206            assert_eq!(handler.name, name);
1207        }
1208    }
1209
1210    #[test]
1211    fn test_handler_group_builder_handler_not_found() {
1212        let mut builder =
1213            HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers();
1214        let err = builder
1215            .add_handler_before("NotExists", CollectStatsHandler::default())
1216            .unwrap_err();
1217        assert_matches!(err, error::Error::HandlerNotFound { .. });
1218
1219        let err = builder
1220            .add_handler_after("NotExists", CollectStatsHandler::default())
1221            .unwrap_err();
1222        assert_matches!(err, error::Error::HandlerNotFound { .. });
1223
1224        let err = builder
1225            .replace_handler("NotExists", CollectStatsHandler::default())
1226            .unwrap_err();
1227        assert_matches!(err, error::Error::HandlerNotFound { .. });
1228    }
1229
1230    #[tokio::test]
1231    async fn test_pusher_drop() {
1232        let (tx, _rx) = mpsc::channel(1);
1233        let pusher = Pusher::new(tx);
1234        let mut deregister_signal_tx = pusher.deregister_signal_receiver.clone();
1235
1236        drop(pusher);
1237        deregister_signal_tx.changed().await.unwrap();
1238    }
1239}