meta_srv/
handler.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashSet};
16use std::fmt::{Debug, Display};
17use std::ops::Range;
18use std::sync::{Arc, Mutex};
19use std::time::{Duration, Instant};
20
21use api::v1::meta::mailbox_message::Payload;
22use api::v1::meta::{
23    HeartbeatRequest, HeartbeatResponse, MailboxMessage, RegionLease, ResponseHeader, Role,
24    PROTOCOL_VERSION,
25};
26use check_leader_handler::CheckLeaderHandler;
27use collect_cluster_info_handler::{
28    CollectDatanodeClusterInfoHandler, CollectFlownodeClusterInfoHandler,
29    CollectFrontendClusterInfoHandler,
30};
31use collect_leader_region_handler::CollectLeaderRegionHandler;
32use collect_stats_handler::CollectStatsHandler;
33use common_base::Plugins;
34use common_meta::datanode::Stat;
35use common_meta::instruction::{Instruction, InstructionReply};
36use common_meta::sequence::Sequence;
37use common_telemetry::{debug, info, warn};
38use dashmap::DashMap;
39use extract_stat_handler::ExtractStatHandler;
40use failure_handler::RegionFailureHandler;
41use filter_inactive_region_stats::FilterInactiveRegionStatsHandler;
42use futures::future::join_all;
43use keep_lease_handler::{DatanodeKeepLeaseHandler, FlownodeKeepLeaseHandler};
44use mailbox_handler::MailboxHandler;
45use on_leader_start_handler::OnLeaderStartHandler;
46use publish_heartbeat_handler::PublishHeartbeatHandler;
47use region_lease_handler::RegionLeaseHandler;
48use remap_flow_peer_handler::RemapFlowPeerHandler;
49use response_header_handler::ResponseHeaderHandler;
50use snafu::{OptionExt, ResultExt};
51use store_api::storage::RegionId;
52use tokio::sync::mpsc::Sender;
53use tokio::sync::{oneshot, watch, Notify, RwLock};
54
55use crate::error::{self, DeserializeFromJsonSnafu, Result, UnexpectedInstructionReplySnafu};
56use crate::handler::flow_state_handler::FlowStateHandler;
57use crate::metasrv::Context;
58use crate::metrics::{METRIC_META_HANDLER_EXECUTE, METRIC_META_HEARTBEAT_CONNECTION_NUM};
59use crate::pubsub::PublisherRef;
60use crate::service::mailbox::{
61    BroadcastChannel, Channel, Mailbox, MailboxReceiver, MailboxRef, MessageId,
62};
63
64pub mod check_leader_handler;
65pub mod collect_cluster_info_handler;
66pub mod collect_leader_region_handler;
67pub mod collect_stats_handler;
68pub mod extract_stat_handler;
69pub mod failure_handler;
70pub mod filter_inactive_region_stats;
71pub mod flow_state_handler;
72pub mod keep_lease_handler;
73pub mod mailbox_handler;
74pub mod on_leader_start_handler;
75pub mod publish_heartbeat_handler;
76pub mod region_lease_handler;
77pub mod remap_flow_peer_handler;
78pub mod response_header_handler;
79
80#[async_trait::async_trait]
81pub trait HeartbeatHandler: Send + Sync {
82    fn is_acceptable(&self, role: Role) -> bool;
83
84    fn name(&self) -> &'static str {
85        let type_name = std::any::type_name::<Self>();
86        // short name
87        type_name.split("::").last().unwrap_or(type_name)
88    }
89
90    async fn handle(
91        &self,
92        req: &HeartbeatRequest,
93        ctx: &mut Context,
94        acc: &mut HeartbeatAccumulator,
95    ) -> Result<HandleControl>;
96}
97
98/// HandleControl
99///
100/// Controls process of handling heartbeat request.
101#[derive(PartialEq, Debug)]
102pub enum HandleControl {
103    Continue,
104    Done,
105}
106
107#[derive(Debug, Default)]
108pub struct HeartbeatAccumulator {
109    pub header: Option<ResponseHeader>,
110    pub instructions: Vec<Instruction>,
111    pub stat: Option<Stat>,
112    pub inactive_region_ids: HashSet<RegionId>,
113    pub region_lease: Option<RegionLease>,
114}
115
116impl HeartbeatAccumulator {
117    pub fn into_mailbox_message(self) -> Option<MailboxMessage> {
118        // TODO(jiachun): to HeartbeatResponse payload
119        None
120    }
121}
122
123#[derive(Copy, Clone)]
124pub struct PusherId {
125    pub role: Role,
126    pub id: u64,
127}
128
129impl Debug for PusherId {
130    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
131        write!(f, "{:?}-{}", self.role, self.id)
132    }
133}
134
135impl Display for PusherId {
136    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
137        write!(f, "{:?}-{}", self.role, self.id)
138    }
139}
140
141impl PusherId {
142    pub fn new(role: Role, id: u64) -> Self {
143        Self { role, id }
144    }
145
146    pub fn string_key(&self) -> String {
147        format!("{}-{}", self.role as i32, self.id)
148    }
149}
150
151/// The receiver of the deregister signal.
152pub type DeregisterSignalReceiver = watch::Receiver<bool>;
153
154/// The pusher of the heartbeat response.
155pub struct Pusher {
156    sender: Sender<std::result::Result<HeartbeatResponse, tonic::Status>>,
157    // The sender of the deregister signal.
158    // default is false, means the pusher is not deregistered.
159    // when the pusher is deregistered, the sender will be notified.
160    deregister_signal_sender: watch::Sender<bool>,
161    deregister_signal_receiver: DeregisterSignalReceiver,
162
163    res_header: ResponseHeader,
164}
165
166impl Drop for Pusher {
167    fn drop(&mut self) {
168        // Ignore the error here.
169        // if all the receivers have been dropped, means no body cares the deregister signal.
170        let _ = self.deregister_signal_sender.send(true);
171    }
172}
173
174impl Pusher {
175    pub fn new(sender: Sender<std::result::Result<HeartbeatResponse, tonic::Status>>) -> Self {
176        let res_header = ResponseHeader {
177            protocol_version: PROTOCOL_VERSION,
178            ..Default::default()
179        };
180        let (deregister_signal_sender, deregister_signal_receiver) = watch::channel(false);
181        Self {
182            sender,
183            deregister_signal_sender,
184            deregister_signal_receiver,
185            res_header,
186        }
187    }
188
189    #[inline]
190    pub async fn push(&self, res: HeartbeatResponse) -> Result<()> {
191        self.sender.send(Ok(res)).await.map_err(|e| {
192            error::PushMessageSnafu {
193                err_msg: e.to_string(),
194            }
195            .build()
196        })
197    }
198
199    #[inline]
200    pub fn header(&self) -> ResponseHeader {
201        self.res_header.clone()
202    }
203}
204
205/// The group of heartbeat pushers.
206#[derive(Clone, Default)]
207pub struct Pushers(Arc<RwLock<BTreeMap<String, Pusher>>>);
208
209impl Pushers {
210    async fn push(
211        &self,
212        pusher_id: PusherId,
213        mailbox_message: MailboxMessage,
214    ) -> Result<DeregisterSignalReceiver> {
215        let pusher_id = pusher_id.string_key();
216        let pushers = self.0.read().await;
217        let pusher = pushers
218            .get(&pusher_id)
219            .context(error::PusherNotFoundSnafu { pusher_id })?;
220
221        pusher
222            .push(HeartbeatResponse {
223                header: Some(pusher.header()),
224                mailbox_message: Some(mailbox_message),
225                ..Default::default()
226            })
227            .await?;
228
229        Ok(pusher.deregister_signal_receiver.clone())
230    }
231
232    async fn broadcast(
233        &self,
234        range: Range<String>,
235        mailbox_message: &MailboxMessage,
236    ) -> Result<()> {
237        let pushers = self.0.read().await;
238        let pushers = pushers
239            .range(range)
240            .map(|(_, value)| value)
241            .collect::<Vec<_>>();
242        let mut results = Vec::with_capacity(pushers.len());
243
244        for pusher in pushers {
245            let mut mailbox_message = mailbox_message.clone();
246            mailbox_message.id = 0; // one-way message
247
248            results.push(pusher.push(HeartbeatResponse {
249                header: Some(pusher.header()),
250                mailbox_message: Some(mailbox_message),
251                ..Default::default()
252            }))
253        }
254
255        // Checks the error out of the loop.
256        let _ = join_all(results)
257            .await
258            .into_iter()
259            .collect::<Result<Vec<_>>>()?;
260
261        Ok(())
262    }
263
264    pub(crate) async fn insert(&self, pusher_id: String, pusher: Pusher) -> Option<Pusher> {
265        self.0.write().await.insert(pusher_id, pusher)
266    }
267
268    async fn remove(&self, pusher_id: &str) -> Option<Pusher> {
269        self.0.write().await.remove(pusher_id)
270    }
271}
272
273#[derive(Clone)]
274pub struct NameCachedHandler {
275    name: &'static str,
276    handler: Arc<dyn HeartbeatHandler>,
277}
278
279impl NameCachedHandler {
280    fn new(handler: impl HeartbeatHandler + 'static) -> Self {
281        let name = handler.name();
282        let handler = Arc::new(handler);
283        Self { name, handler }
284    }
285}
286
287pub type HeartbeatHandlerGroupRef = Arc<HeartbeatHandlerGroup>;
288
289/// The group of heartbeat handlers.
290#[derive(Default, Clone)]
291pub struct HeartbeatHandlerGroup {
292    handlers: Vec<NameCachedHandler>,
293    pushers: Pushers,
294}
295
296impl HeartbeatHandlerGroup {
297    /// Registers the heartbeat response [`Pusher`] with the given key to the group.
298    pub async fn register_pusher(&self, pusher_id: PusherId, pusher: Pusher) {
299        METRIC_META_HEARTBEAT_CONNECTION_NUM.inc();
300        info!("Pusher register: {}", pusher_id);
301        let _ = self.pushers.insert(pusher_id.string_key(), pusher).await;
302    }
303
304    /// Deregisters the heartbeat response [`Pusher`] with the given key from the group.
305    ///
306    /// Returns the [`Pusher`] if it exists.
307    pub async fn deregister_push(&self, pusher_id: PusherId) -> Option<Pusher> {
308        METRIC_META_HEARTBEAT_CONNECTION_NUM.dec();
309        info!("Pusher unregister: {}", pusher_id);
310        self.pushers.remove(&pusher_id.string_key()).await
311    }
312
313    /// Returns the [`Pushers`] of the group.
314    pub fn pushers(&self) -> Pushers {
315        self.pushers.clone()
316    }
317
318    /// Handles the heartbeat request.
319    pub async fn handle(
320        &self,
321        req: HeartbeatRequest,
322        mut ctx: Context,
323    ) -> Result<HeartbeatResponse> {
324        let mut acc = HeartbeatAccumulator::default();
325        let role = req
326            .header
327            .as_ref()
328            .and_then(|h| Role::try_from(h.role).ok())
329            .context(error::InvalidArgumentsSnafu {
330                err_msg: format!("invalid role: {:?}", req.header),
331            })?;
332
333        for NameCachedHandler { name, handler } in self.handlers.iter() {
334            if !handler.is_acceptable(role) {
335                continue;
336            }
337
338            let _timer = METRIC_META_HANDLER_EXECUTE
339                .with_label_values(&[*name])
340                .start_timer();
341
342            if handler.handle(&req, &mut ctx, &mut acc).await? == HandleControl::Done {
343                break;
344            }
345        }
346        let header = std::mem::take(&mut acc.header);
347        let res = HeartbeatResponse {
348            header,
349            region_lease: acc.region_lease,
350            ..Default::default()
351        };
352        Ok(res)
353    }
354}
355
356pub struct HeartbeatMailbox {
357    pushers: Pushers,
358    sequence: Sequence,
359    senders: DashMap<MessageId, oneshot::Sender<Result<MailboxMessage>>>,
360    timeouts: DashMap<MessageId, Instant>,
361    timeout_notify: Notify,
362}
363
364impl HeartbeatMailbox {
365    pub fn json_reply(msg: &MailboxMessage) -> Result<InstructionReply> {
366        let Payload::Json(payload) =
367            msg.payload
368                .as_ref()
369                .with_context(|| UnexpectedInstructionReplySnafu {
370                    mailbox_message: msg.to_string(),
371                    reason: format!("empty payload, msg: {msg:?}"),
372                })?;
373        serde_json::from_str(payload).context(DeserializeFromJsonSnafu { input: payload })
374    }
375
376    /// Parses the [Instruction] from [MailboxMessage].
377    #[cfg(test)]
378    pub(crate) fn json_instruction(msg: &MailboxMessage) -> Result<Instruction> {
379        let Payload::Json(payload) =
380            msg.payload
381                .as_ref()
382                .with_context(|| UnexpectedInstructionReplySnafu {
383                    mailbox_message: msg.to_string(),
384                    reason: format!("empty payload, msg: {msg:?}"),
385                })?;
386        serde_json::from_str(payload).context(DeserializeFromJsonSnafu { input: payload })
387    }
388
389    pub fn create(pushers: Pushers, sequence: Sequence) -> MailboxRef {
390        let mailbox = Arc::new(Self::new(pushers, sequence));
391
392        let timeout_checker = mailbox.clone();
393        let _handle = common_runtime::spawn_global(async move {
394            timeout_checker.check_timeout_bg(10).await;
395        });
396
397        mailbox
398    }
399
400    fn new(pushers: Pushers, sequence: Sequence) -> Self {
401        Self {
402            pushers,
403            sequence,
404            senders: DashMap::default(),
405            timeouts: DashMap::default(),
406            timeout_notify: Notify::new(),
407        }
408    }
409
410    async fn check_timeout_bg(&self, interval_millis: u64) {
411        let mut interval = tokio::time::interval(Duration::from_millis(interval_millis));
412
413        loop {
414            let _ = interval.tick().await;
415
416            if self.timeouts.is_empty() {
417                self.timeout_notify.notified().await;
418            }
419
420            let now = Instant::now();
421            let timeout_ids = self
422                .timeouts
423                .iter()
424                .filter_map(|entry| {
425                    let (id, deadline) = entry.pair();
426                    if deadline < &now {
427                        Some(*id)
428                    } else {
429                        None
430                    }
431                })
432                .collect::<Vec<_>>();
433
434            for id in timeout_ids {
435                let _ = self
436                    .on_recv(id, Err(error::MailboxTimeoutSnafu { id }.build()))
437                    .await;
438            }
439        }
440    }
441
442    #[inline]
443    async fn next_message_id(&self) -> Result<u64> {
444        // In this implementation, we pre-occupy the message_id of 0,
445        // and we use `message_id = 0` to mark a Message as a one-way call.
446        loop {
447            let next = self
448                .sequence
449                .next()
450                .await
451                .context(error::NextSequenceSnafu)?;
452            if next > 0 {
453                return Ok(next);
454            }
455        }
456    }
457}
458
459#[async_trait::async_trait]
460impl Mailbox for HeartbeatMailbox {
461    async fn send(
462        &self,
463        ch: &Channel,
464        mut msg: MailboxMessage,
465        timeout: Duration,
466    ) -> Result<MailboxReceiver> {
467        let message_id = self.next_message_id().await?;
468        msg.id = message_id;
469
470        let pusher_id = ch.pusher_id();
471        debug!("Sending mailbox message {msg:?} to {pusher_id}");
472
473        let (tx, rx) = oneshot::channel();
474        let _ = self.senders.insert(message_id, tx);
475        let deadline = Instant::now() + timeout;
476        self.timeouts.insert(message_id, deadline);
477        self.timeout_notify.notify_one();
478
479        let deregister_signal_receiver = self.pushers.push(pusher_id, msg).await?;
480
481        Ok(MailboxReceiver::new(
482            message_id,
483            rx,
484            deregister_signal_receiver,
485            *ch,
486        ))
487    }
488
489    async fn send_oneway(&self, ch: &Channel, mut msg: MailboxMessage) -> Result<()> {
490        let message_id = 0; // one-way message, same as `broadcast`
491        msg.id = message_id;
492
493        let pusher_id = ch.pusher_id();
494        debug!("Sending mailbox message {msg:?} to {pusher_id}");
495
496        self.pushers.push(pusher_id, msg).await?;
497
498        Ok(())
499    }
500
501    async fn broadcast(&self, ch: &BroadcastChannel, msg: &MailboxMessage) -> Result<()> {
502        self.pushers.broadcast(ch.pusher_range(), msg).await
503    }
504
505    async fn on_recv(&self, id: MessageId, maybe_msg: Result<MailboxMessage>) -> Result<()> {
506        debug!("Received mailbox message {maybe_msg:?}");
507
508        let _ = self.timeouts.remove(&id);
509
510        if let Some((_, tx)) = self.senders.remove(&id) {
511            tx.send(maybe_msg)
512                .map_err(|_| error::MailboxClosedSnafu { id }.build())?;
513        } else if let Ok(finally_msg) = maybe_msg {
514            warn!("The response arrived too late: {finally_msg:?}");
515        }
516
517        Ok(())
518    }
519}
520
521/// The builder to build the group of heartbeat handlers.
522pub struct HeartbeatHandlerGroupBuilder {
523    /// The handler to handle region failure.
524    region_failure_handler: Option<RegionFailureHandler>,
525
526    /// The handler to handle region lease.
527    region_lease_handler: Option<RegionLeaseHandler>,
528
529    /// The factor that determines how often statistics should be flushed,
530    /// based on the number of received heartbeats. When the number of heartbeats
531    /// reaches this factor, a flush operation is triggered.
532    flush_stats_factor: Option<usize>,
533    /// A simple handler for flow internal state report
534    flow_state_handler: Option<FlowStateHandler>,
535
536    /// The plugins.
537    plugins: Option<Plugins>,
538
539    /// The heartbeat response pushers.
540    pushers: Pushers,
541
542    /// The group of heartbeat handlers.
543    handlers: Vec<NameCachedHandler>,
544}
545
546impl HeartbeatHandlerGroupBuilder {
547    pub fn new(pushers: Pushers) -> Self {
548        Self {
549            region_failure_handler: None,
550            region_lease_handler: None,
551            flush_stats_factor: None,
552            flow_state_handler: None,
553            plugins: None,
554            pushers,
555            handlers: vec![],
556        }
557    }
558
559    pub fn with_flow_state_handler(mut self, handler: Option<FlowStateHandler>) -> Self {
560        self.flow_state_handler = handler;
561        self
562    }
563
564    pub fn with_region_lease_handler(mut self, handler: Option<RegionLeaseHandler>) -> Self {
565        self.region_lease_handler = handler;
566        self
567    }
568
569    /// Sets the [`RegionFailureHandler`].
570    pub fn with_region_failure_handler(mut self, handler: Option<RegionFailureHandler>) -> Self {
571        self.region_failure_handler = handler;
572        self
573    }
574
575    /// Sets the flush stats factor.
576    pub fn with_flush_stats_factor(mut self, flush_stats_factor: Option<usize>) -> Self {
577        self.flush_stats_factor = flush_stats_factor;
578        self
579    }
580
581    /// Sets the [`Plugins`].
582    pub fn with_plugins(mut self, plugins: Option<Plugins>) -> Self {
583        self.plugins = plugins;
584        self
585    }
586
587    /// Adds the default handlers.
588    pub fn add_default_handlers(mut self) -> Self {
589        // Extract the `PublishHeartbeatHandler` from the plugins.
590        let publish_heartbeat_handler = if let Some(plugins) = self.plugins.as_ref() {
591            plugins
592                .get::<PublisherRef>()
593                .map(|publish| PublishHeartbeatHandler::new(publish.clone()))
594        } else {
595            None
596        };
597
598        self.add_handler_last(ResponseHeaderHandler);
599        // `KeepLeaseHandler` should preferably be in front of `CheckLeaderHandler`,
600        // because even if the current meta-server node is no longer the leader it can
601        // still help the datanode to keep lease.
602        self.add_handler_last(DatanodeKeepLeaseHandler);
603        self.add_handler_last(FlownodeKeepLeaseHandler);
604        self.add_handler_last(CheckLeaderHandler);
605        self.add_handler_last(OnLeaderStartHandler);
606        self.add_handler_last(ExtractStatHandler);
607        self.add_handler_last(CollectDatanodeClusterInfoHandler);
608        self.add_handler_last(CollectFrontendClusterInfoHandler);
609        self.add_handler_last(CollectFlownodeClusterInfoHandler);
610        self.add_handler_last(MailboxHandler);
611        if let Some(region_lease_handler) = self.region_lease_handler.take() {
612            self.add_handler_last(region_lease_handler);
613        }
614        self.add_handler_last(FilterInactiveRegionStatsHandler);
615        if let Some(region_failure_handler) = self.region_failure_handler.take() {
616            self.add_handler_last(region_failure_handler);
617        }
618        if let Some(publish_heartbeat_handler) = publish_heartbeat_handler {
619            self.add_handler_last(publish_heartbeat_handler);
620        }
621        self.add_handler_last(CollectLeaderRegionHandler);
622        self.add_handler_last(CollectStatsHandler::new(self.flush_stats_factor));
623        self.add_handler_last(RemapFlowPeerHandler::default());
624
625        if let Some(flow_state_handler) = self.flow_state_handler.take() {
626            self.add_handler_last(flow_state_handler);
627        }
628
629        self
630    }
631
632    /// Builds the group of heartbeat handlers.
633    ///
634    /// Applies the customizer if it exists.
635    pub fn build(mut self) -> Result<HeartbeatHandlerGroup> {
636        if let Some(customizer) = self
637            .plugins
638            .as_ref()
639            .and_then(|plugins| plugins.get::<HeartbeatHandlerGroupBuilderCustomizerRef>())
640        {
641            debug!("Customizing the heartbeat handler group builder");
642            customizer.customize(&mut self)?;
643        }
644
645        Ok(HeartbeatHandlerGroup {
646            handlers: self.handlers,
647            pushers: self.pushers,
648        })
649    }
650
651    fn add_handler_after_inner(&mut self, target: &str, handler: NameCachedHandler) -> Result<()> {
652        if let Some(pos) = self.handlers.iter().position(|x| x.name == target) {
653            self.handlers.insert(pos + 1, handler);
654            return Ok(());
655        }
656
657        error::HandlerNotFoundSnafu { name: target }.fail()
658    }
659
660    /// Adds the handler after the specified handler.
661    pub fn add_handler_after(
662        &mut self,
663        target: &'static str,
664        handler: impl HeartbeatHandler + 'static,
665    ) -> Result<()> {
666        self.add_handler_after_inner(target, NameCachedHandler::new(handler))
667    }
668
669    fn add_handler_before_inner(&mut self, target: &str, handler: NameCachedHandler) -> Result<()> {
670        if let Some(pos) = self.handlers.iter().position(|x| x.name == target) {
671            self.handlers.insert(pos, handler);
672            return Ok(());
673        }
674
675        error::HandlerNotFoundSnafu { name: target }.fail()
676    }
677
678    /// Adds the handler before the specified handler.
679    pub fn add_handler_before(
680        &mut self,
681        target: &'static str,
682        handler: impl HeartbeatHandler + 'static,
683    ) -> Result<()> {
684        self.add_handler_before_inner(target, NameCachedHandler::new(handler))
685    }
686
687    fn replace_handler_inner(&mut self, target: &str, handler: NameCachedHandler) -> Result<()> {
688        if let Some(pos) = self.handlers.iter().position(|x| x.name == target) {
689            self.handlers[pos] = handler;
690            return Ok(());
691        }
692
693        error::HandlerNotFoundSnafu { name: target }.fail()
694    }
695
696    /// Replaces the handler with the specified name.
697    pub fn replace_handler(
698        &mut self,
699        target: &'static str,
700        handler: impl HeartbeatHandler + 'static,
701    ) -> Result<()> {
702        self.replace_handler_inner(target, NameCachedHandler::new(handler))
703    }
704
705    fn add_handler_last_inner(&mut self, handler: NameCachedHandler) {
706        self.handlers.push(handler);
707    }
708
709    fn add_handler_last(&mut self, handler: impl HeartbeatHandler + 'static) {
710        self.add_handler_last_inner(NameCachedHandler::new(handler));
711    }
712}
713
714pub type HeartbeatHandlerGroupBuilderCustomizerRef =
715    Arc<dyn HeartbeatHandlerGroupBuilderCustomizer>;
716
717pub enum CustomizeHeartbeatGroupAction {
718    AddHandlerAfter {
719        target: String,
720        handler: NameCachedHandler,
721    },
722    AddHandlerBefore {
723        target: String,
724        handler: NameCachedHandler,
725    },
726    ReplaceHandler {
727        target: String,
728        handler: NameCachedHandler,
729    },
730    AddHandlerLast {
731        handler: NameCachedHandler,
732    },
733}
734
735impl CustomizeHeartbeatGroupAction {
736    pub fn new_add_handler_after(
737        target: &'static str,
738        handler: impl HeartbeatHandler + 'static,
739    ) -> Self {
740        Self::AddHandlerAfter {
741            target: target.to_string(),
742            handler: NameCachedHandler::new(handler),
743        }
744    }
745
746    pub fn new_add_handler_before(
747        target: &'static str,
748        handler: impl HeartbeatHandler + 'static,
749    ) -> Self {
750        Self::AddHandlerBefore {
751            target: target.to_string(),
752            handler: NameCachedHandler::new(handler),
753        }
754    }
755
756    pub fn new_replace_handler(
757        target: &'static str,
758        handler: impl HeartbeatHandler + 'static,
759    ) -> Self {
760        Self::ReplaceHandler {
761            target: target.to_string(),
762            handler: NameCachedHandler::new(handler),
763        }
764    }
765
766    pub fn new_add_handler_last(handler: impl HeartbeatHandler + 'static) -> Self {
767        Self::AddHandlerLast {
768            handler: NameCachedHandler::new(handler),
769        }
770    }
771}
772
773/// The customizer of the [`HeartbeatHandlerGroupBuilder`].
774pub trait HeartbeatHandlerGroupBuilderCustomizer: Send + Sync {
775    fn customize(&self, builder: &mut HeartbeatHandlerGroupBuilder) -> Result<()>;
776
777    fn add_action(&self, action: CustomizeHeartbeatGroupAction);
778}
779
780#[derive(Default)]
781pub struct DefaultHeartbeatHandlerGroupBuilderCustomizer {
782    actions: Mutex<Vec<CustomizeHeartbeatGroupAction>>,
783}
784
785impl HeartbeatHandlerGroupBuilderCustomizer for DefaultHeartbeatHandlerGroupBuilderCustomizer {
786    fn customize(&self, builder: &mut HeartbeatHandlerGroupBuilder) -> Result<()> {
787        info!("Customizing the heartbeat handler group builder");
788        let mut actions = self.actions.lock().unwrap();
789        for action in actions.drain(..) {
790            match action {
791                CustomizeHeartbeatGroupAction::AddHandlerAfter { target, handler } => {
792                    builder.add_handler_after_inner(&target, handler)?;
793                }
794                CustomizeHeartbeatGroupAction::AddHandlerBefore { target, handler } => {
795                    builder.add_handler_before_inner(&target, handler)?;
796                }
797                CustomizeHeartbeatGroupAction::ReplaceHandler { target, handler } => {
798                    builder.replace_handler_inner(&target, handler)?;
799                }
800                CustomizeHeartbeatGroupAction::AddHandlerLast { handler } => {
801                    builder.add_handler_last_inner(handler);
802                }
803            }
804        }
805        Ok(())
806    }
807
808    fn add_action(&self, action: CustomizeHeartbeatGroupAction) {
809        self.actions.lock().unwrap().push(action);
810    }
811}
812
813#[cfg(test)]
814mod tests {
815
816    use std::assert_matches::assert_matches;
817    use std::sync::Arc;
818    use std::time::Duration;
819
820    use api::v1::meta::{MailboxMessage, Role};
821    use common_meta::kv_backend::memory::MemoryKvBackend;
822    use common_meta::sequence::SequenceBuilder;
823    use tokio::sync::mpsc;
824
825    use super::{HeartbeatHandlerGroupBuilder, PusherId, Pushers};
826    use crate::error;
827    use crate::handler::collect_stats_handler::CollectStatsHandler;
828    use crate::handler::response_header_handler::ResponseHeaderHandler;
829    use crate::handler::{HeartbeatHandlerGroup, HeartbeatMailbox, Pusher};
830    use crate::service::mailbox::{Channel, MailboxReceiver, MailboxRef};
831
832    #[tokio::test]
833    async fn test_mailbox() {
834        let (mailbox, receiver) = push_msg_via_mailbox().await;
835        let id = receiver.message_id();
836
837        let resp_msg = MailboxMessage {
838            id,
839            subject: "resp-test".to_string(),
840            timestamp_millis: 456,
841            ..Default::default()
842        };
843
844        mailbox.on_recv(id, Ok(resp_msg)).await.unwrap();
845
846        let recv_msg = receiver.await.unwrap();
847        assert_eq!(recv_msg.id, id);
848        assert_eq!(recv_msg.timestamp_millis, 456);
849        assert_eq!(recv_msg.subject, "resp-test".to_string());
850    }
851
852    #[tokio::test]
853    async fn test_mailbox_timeout() {
854        let (_, receiver) = push_msg_via_mailbox().await;
855        let res = receiver.await;
856        assert!(res.is_err());
857    }
858
859    async fn push_msg_via_mailbox() -> (MailboxRef, MailboxReceiver) {
860        let datanode_id = 12;
861        let (pusher_tx, mut pusher_rx) = mpsc::channel(16);
862        let pusher_id = PusherId::new(Role::Datanode, datanode_id);
863        let pusher: Pusher = Pusher::new(pusher_tx);
864        let handler_group = HeartbeatHandlerGroup::default();
865        handler_group.register_pusher(pusher_id, pusher).await;
866
867        let kv_backend = Arc::new(MemoryKvBackend::new());
868        let seq = SequenceBuilder::new("test_seq", kv_backend).build();
869        let mailbox = HeartbeatMailbox::create(handler_group.pushers(), seq);
870
871        let msg = MailboxMessage {
872            id: 0,
873            subject: "req-test".to_string(),
874            timestamp_millis: 123,
875            ..Default::default()
876        };
877        let ch = Channel::Datanode(datanode_id);
878
879        let receiver = mailbox
880            .send(&ch, msg, Duration::from_secs(1))
881            .await
882            .unwrap();
883
884        let recv_obj = pusher_rx.recv().await.unwrap().unwrap();
885        let message = recv_obj.mailbox_message.unwrap();
886        assert_eq!(message.timestamp_millis, 123);
887        assert_eq!(message.subject, "req-test".to_string());
888
889        (mailbox, receiver)
890    }
891
892    #[test]
893    fn test_handler_group_builder() {
894        let group = HeartbeatHandlerGroupBuilder::new(Pushers::default())
895            .add_default_handlers()
896            .build()
897            .unwrap();
898
899        let handlers = group.handlers;
900        assert_eq!(14, handlers.len());
901
902        let names = [
903            "ResponseHeaderHandler",
904            "DatanodeKeepLeaseHandler",
905            "FlownodeKeepLeaseHandler",
906            "CheckLeaderHandler",
907            "OnLeaderStartHandler",
908            "ExtractStatHandler",
909            "CollectDatanodeClusterInfoHandler",
910            "CollectFrontendClusterInfoHandler",
911            "CollectFlownodeClusterInfoHandler",
912            "MailboxHandler",
913            "FilterInactiveRegionStatsHandler",
914            "CollectLeaderRegionHandler",
915            "CollectStatsHandler",
916            "RemapFlowPeerHandler",
917        ];
918
919        for (handler, name) in handlers.iter().zip(names.into_iter()) {
920            assert_eq!(handler.name, name);
921        }
922    }
923
924    #[test]
925    fn test_handler_group_builder_add_before() {
926        let mut builder =
927            HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers();
928        builder
929            .add_handler_before(
930                "FilterInactiveRegionStatsHandler",
931                CollectStatsHandler::default(),
932            )
933            .unwrap();
934
935        let group = builder.build().unwrap();
936        let handlers = group.handlers;
937        assert_eq!(15, handlers.len());
938
939        let names = [
940            "ResponseHeaderHandler",
941            "DatanodeKeepLeaseHandler",
942            "FlownodeKeepLeaseHandler",
943            "CheckLeaderHandler",
944            "OnLeaderStartHandler",
945            "ExtractStatHandler",
946            "CollectDatanodeClusterInfoHandler",
947            "CollectFrontendClusterInfoHandler",
948            "CollectFlownodeClusterInfoHandler",
949            "MailboxHandler",
950            "CollectStatsHandler",
951            "FilterInactiveRegionStatsHandler",
952            "CollectLeaderRegionHandler",
953            "CollectStatsHandler",
954            "RemapFlowPeerHandler",
955        ];
956
957        for (handler, name) in handlers.iter().zip(names.into_iter()) {
958            assert_eq!(handler.name, name);
959        }
960    }
961
962    #[test]
963    fn test_handler_group_builder_add_before_first() {
964        let mut builder =
965            HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers();
966        builder
967            .add_handler_before("ResponseHeaderHandler", CollectStatsHandler::default())
968            .unwrap();
969
970        let group = builder.build().unwrap();
971        let handlers = group.handlers;
972        assert_eq!(15, handlers.len());
973
974        let names = [
975            "CollectStatsHandler",
976            "ResponseHeaderHandler",
977            "DatanodeKeepLeaseHandler",
978            "FlownodeKeepLeaseHandler",
979            "CheckLeaderHandler",
980            "OnLeaderStartHandler",
981            "ExtractStatHandler",
982            "CollectDatanodeClusterInfoHandler",
983            "CollectFrontendClusterInfoHandler",
984            "CollectFlownodeClusterInfoHandler",
985            "MailboxHandler",
986            "FilterInactiveRegionStatsHandler",
987            "CollectLeaderRegionHandler",
988            "CollectStatsHandler",
989            "RemapFlowPeerHandler",
990        ];
991
992        for (handler, name) in handlers.iter().zip(names.into_iter()) {
993            assert_eq!(handler.name, name);
994        }
995    }
996
997    #[test]
998    fn test_handler_group_builder_add_after() {
999        let mut builder =
1000            HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers();
1001        builder
1002            .add_handler_after("MailboxHandler", CollectStatsHandler::default())
1003            .unwrap();
1004
1005        let group = builder.build().unwrap();
1006        let handlers = group.handlers;
1007        assert_eq!(15, handlers.len());
1008
1009        let names = [
1010            "ResponseHeaderHandler",
1011            "DatanodeKeepLeaseHandler",
1012            "FlownodeKeepLeaseHandler",
1013            "CheckLeaderHandler",
1014            "OnLeaderStartHandler",
1015            "ExtractStatHandler",
1016            "CollectDatanodeClusterInfoHandler",
1017            "CollectFrontendClusterInfoHandler",
1018            "CollectFlownodeClusterInfoHandler",
1019            "MailboxHandler",
1020            "CollectStatsHandler",
1021            "FilterInactiveRegionStatsHandler",
1022            "CollectLeaderRegionHandler",
1023            "CollectStatsHandler",
1024            "RemapFlowPeerHandler",
1025        ];
1026
1027        for (handler, name) in handlers.iter().zip(names.into_iter()) {
1028            assert_eq!(handler.name, name);
1029        }
1030    }
1031
1032    #[test]
1033    fn test_handler_group_builder_add_after_last() {
1034        let mut builder =
1035            HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers();
1036        builder
1037            .add_handler_after("CollectStatsHandler", ResponseHeaderHandler)
1038            .unwrap();
1039
1040        let group = builder.build().unwrap();
1041        let handlers = group.handlers;
1042        assert_eq!(15, handlers.len());
1043
1044        let names = [
1045            "ResponseHeaderHandler",
1046            "DatanodeKeepLeaseHandler",
1047            "FlownodeKeepLeaseHandler",
1048            "CheckLeaderHandler",
1049            "OnLeaderStartHandler",
1050            "ExtractStatHandler",
1051            "CollectDatanodeClusterInfoHandler",
1052            "CollectFrontendClusterInfoHandler",
1053            "CollectFlownodeClusterInfoHandler",
1054            "MailboxHandler",
1055            "FilterInactiveRegionStatsHandler",
1056            "CollectLeaderRegionHandler",
1057            "CollectStatsHandler",
1058            "ResponseHeaderHandler",
1059            "RemapFlowPeerHandler",
1060        ];
1061
1062        for (handler, name) in handlers.iter().zip(names.into_iter()) {
1063            assert_eq!(handler.name, name);
1064        }
1065    }
1066
1067    #[test]
1068    fn test_handler_group_builder_replace() {
1069        let mut builder =
1070            HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers();
1071        builder
1072            .replace_handler("MailboxHandler", CollectStatsHandler::default())
1073            .unwrap();
1074
1075        let group = builder.build().unwrap();
1076        let handlers = group.handlers;
1077        assert_eq!(14, handlers.len());
1078
1079        let names = [
1080            "ResponseHeaderHandler",
1081            "DatanodeKeepLeaseHandler",
1082            "FlownodeKeepLeaseHandler",
1083            "CheckLeaderHandler",
1084            "OnLeaderStartHandler",
1085            "ExtractStatHandler",
1086            "CollectDatanodeClusterInfoHandler",
1087            "CollectFrontendClusterInfoHandler",
1088            "CollectFlownodeClusterInfoHandler",
1089            "CollectStatsHandler",
1090            "FilterInactiveRegionStatsHandler",
1091            "CollectLeaderRegionHandler",
1092            "CollectStatsHandler",
1093            "RemapFlowPeerHandler",
1094        ];
1095
1096        for (handler, name) in handlers.iter().zip(names.into_iter()) {
1097            assert_eq!(handler.name, name);
1098        }
1099    }
1100
1101    #[test]
1102    fn test_handler_group_builder_replace_last() {
1103        let mut builder =
1104            HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers();
1105        builder
1106            .replace_handler("CollectStatsHandler", ResponseHeaderHandler)
1107            .unwrap();
1108
1109        let group = builder.build().unwrap();
1110        let handlers = group.handlers;
1111        assert_eq!(14, handlers.len());
1112
1113        let names = [
1114            "ResponseHeaderHandler",
1115            "DatanodeKeepLeaseHandler",
1116            "FlownodeKeepLeaseHandler",
1117            "CheckLeaderHandler",
1118            "OnLeaderStartHandler",
1119            "ExtractStatHandler",
1120            "CollectDatanodeClusterInfoHandler",
1121            "CollectFrontendClusterInfoHandler",
1122            "CollectFlownodeClusterInfoHandler",
1123            "MailboxHandler",
1124            "FilterInactiveRegionStatsHandler",
1125            "CollectLeaderRegionHandler",
1126            "ResponseHeaderHandler",
1127            "RemapFlowPeerHandler",
1128        ];
1129
1130        for (handler, name) in handlers.iter().zip(names.into_iter()) {
1131            assert_eq!(handler.name, name);
1132        }
1133    }
1134
1135    #[test]
1136    fn test_handler_group_builder_replace_first() {
1137        let mut builder =
1138            HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers();
1139        builder
1140            .replace_handler("ResponseHeaderHandler", CollectStatsHandler::default())
1141            .unwrap();
1142
1143        let group = builder.build().unwrap();
1144        let handlers = group.handlers;
1145        assert_eq!(14, handlers.len());
1146
1147        let names = [
1148            "CollectStatsHandler",
1149            "DatanodeKeepLeaseHandler",
1150            "FlownodeKeepLeaseHandler",
1151            "CheckLeaderHandler",
1152            "OnLeaderStartHandler",
1153            "ExtractStatHandler",
1154            "CollectDatanodeClusterInfoHandler",
1155            "CollectFrontendClusterInfoHandler",
1156            "CollectFlownodeClusterInfoHandler",
1157            "MailboxHandler",
1158            "FilterInactiveRegionStatsHandler",
1159            "CollectLeaderRegionHandler",
1160            "CollectStatsHandler",
1161            "RemapFlowPeerHandler",
1162        ];
1163
1164        for (handler, name) in handlers.iter().zip(names.into_iter()) {
1165            assert_eq!(handler.name, name);
1166        }
1167    }
1168
1169    #[test]
1170    fn test_handler_group_builder_handler_not_found() {
1171        let mut builder =
1172            HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers();
1173        let err = builder
1174            .add_handler_before("NotExists", CollectStatsHandler::default())
1175            .unwrap_err();
1176        assert_matches!(err, error::Error::HandlerNotFound { .. });
1177
1178        let err = builder
1179            .add_handler_after("NotExists", CollectStatsHandler::default())
1180            .unwrap_err();
1181        assert_matches!(err, error::Error::HandlerNotFound { .. });
1182
1183        let err = builder
1184            .replace_handler("NotExists", CollectStatsHandler::default())
1185            .unwrap_err();
1186        assert_matches!(err, error::Error::HandlerNotFound { .. });
1187    }
1188
1189    #[tokio::test]
1190    async fn test_pusher_drop() {
1191        let (tx, _rx) = mpsc::channel(1);
1192        let pusher = Pusher::new(tx);
1193        let mut deregister_signal_tx = pusher.deregister_signal_receiver.clone();
1194
1195        drop(pusher);
1196        deregister_signal_tx.changed().await.unwrap();
1197    }
1198}