Skip to main content

meta_srv/service/
mailbox.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::{Display, Formatter};
16use std::pin::Pin;
17use std::sync::Arc;
18use std::task::{Context, Poll};
19use std::time::Duration;
20
21use api::v1::meta::{MailboxMessage, Role};
22use futures::{Future, FutureExt};
23use tokio::sync::oneshot;
24
25use crate::error::{self, Result};
26use crate::handler::{DeregisterSignalReceiver, PusherId};
27
28pub type MailboxRef = Arc<dyn Mailbox>;
29
30pub type MessageId = u64;
31
32#[derive(Debug, PartialEq, Clone, Copy)]
33pub enum Channel {
34    Datanode(u64),
35    Frontend(u64),
36    Flownode(u64),
37}
38
39impl Display for Channel {
40    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
41        match self {
42            Channel::Datanode(id) => {
43                write!(f, "Datanode-{}", id)
44            }
45            Channel::Frontend(id) => {
46                write!(f, "Frontend-{}", id)
47            }
48            Channel::Flownode(id) => {
49                write!(f, "Flownode-{}", id)
50            }
51        }
52    }
53}
54
55impl Channel {
56    pub(crate) fn pusher_id(&self) -> PusherId {
57        match self {
58            Channel::Datanode(id) => PusherId::new(Role::Datanode, *id),
59            Channel::Frontend(id) => PusherId::new(Role::Frontend, *id),
60            Channel::Flownode(id) => PusherId::new(Role::Flownode, *id),
61        }
62    }
63}
64pub enum BroadcastChannel {
65    Datanode,
66    Frontend,
67    Flownode,
68}
69
70impl BroadcastChannel {
71    pub(crate) fn role(&self) -> Role {
72        match self {
73            BroadcastChannel::Datanode => Role::Datanode,
74            BroadcastChannel::Frontend => Role::Frontend,
75            BroadcastChannel::Flownode => Role::Flownode,
76        }
77    }
78}
79
80/// The mailbox receiver
81pub enum MailboxReceiver {
82    Init {
83        message_id: MessageId,
84        ch: Channel,
85        /// The [`MailboxMessage`] receiver
86        rx: Option<oneshot::Receiver<Result<MailboxMessage>>>,
87        /// The pusher deregister signal receiver
88        pusher_deregister_signal_receiver: Option<DeregisterSignalReceiver>,
89    },
90    Polling {
91        message_id: MessageId,
92        ch: Channel,
93        inner_future: Pin<Box<dyn Future<Output = Result<MailboxMessage>> + Send + 'static>>,
94    },
95}
96
97impl MailboxReceiver {
98    pub fn new(
99        message_id: MessageId,
100        rx: oneshot::Receiver<Result<MailboxMessage>>,
101        pusher_deregister_signal_receiver: DeregisterSignalReceiver,
102        ch: Channel,
103    ) -> Self {
104        Self::Init {
105            message_id,
106            rx: Some(rx),
107            pusher_deregister_signal_receiver: Some(pusher_deregister_signal_receiver),
108            ch,
109        }
110    }
111
112    /// Get the message id of the mailbox receiver
113    pub fn message_id(&self) -> MessageId {
114        match self {
115            MailboxReceiver::Init { message_id, .. } => *message_id,
116            MailboxReceiver::Polling { message_id, .. } => *message_id,
117        }
118    }
119
120    /// Get the channel of the mailbox receiver
121    pub fn channel(&self) -> Channel {
122        match self {
123            MailboxReceiver::Init { ch, .. } => *ch,
124            MailboxReceiver::Polling { ch, .. } => *ch,
125        }
126    }
127
128    async fn wait_for_message(
129        rx: oneshot::Receiver<Result<MailboxMessage>>,
130        mut pusher_deregister_signal_receiver: DeregisterSignalReceiver,
131        channel: Channel,
132        message_id: MessageId,
133    ) -> Result<MailboxMessage> {
134        tokio::select! {
135            res = rx => {
136                res.map_err(|e| error::MailboxReceiverSnafu {
137                    id: message_id,
138                    err_msg: e.to_string(),
139                }.build())?
140            }
141            _ = pusher_deregister_signal_receiver.changed() => {
142                 Err(error::MailboxChannelClosedSnafu {
143                    channel,
144                }.build())
145            }
146        }
147    }
148}
149
150impl Future for MailboxReceiver {
151    type Output = Result<MailboxMessage>;
152
153    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
154        match &mut *self {
155            MailboxReceiver::Init {
156                message_id,
157                ch,
158                rx,
159                pusher_deregister_signal_receiver,
160            } => {
161                let polling = MailboxReceiver::Polling {
162                    message_id: *message_id,
163                    ch: *ch,
164                    inner_future: Self::wait_for_message(
165                        rx.take().expect("rx already taken"),
166                        pusher_deregister_signal_receiver
167                            .take()
168                            .expect("pusher_deregister_signal_receiver already taken"),
169                        *ch,
170                        *message_id,
171                    )
172                    .boxed(),
173                };
174
175                *self = polling;
176                self.poll(cx)
177            }
178            MailboxReceiver::Polling { inner_future, .. } => {
179                let result = futures::ready!(inner_future.as_mut().poll(cx));
180                Poll::Ready(result)
181            }
182        }
183    }
184}
185
186#[async_trait::async_trait]
187pub trait Mailbox: Send + Sync {
188    async fn send(
189        &self,
190        ch: &Channel,
191        msg: MailboxMessage,
192        timeout: Duration,
193    ) -> Result<MailboxReceiver>;
194
195    async fn send_oneway(&self, ch: &Channel, msg: MailboxMessage) -> Result<()>;
196
197    async fn broadcast(&self, ch: &BroadcastChannel, msg: &MailboxMessage) -> Result<()>;
198
199    async fn on_recv(&self, id: MessageId, maybe_msg: Result<MailboxMessage>) -> Result<()>;
200}
201
202#[cfg(test)]
203mod tests {
204    use std::assert_matches;
205
206    use common_time::util::current_time_millis;
207    use tokio::sync::watch;
208
209    use super::*;
210
211    #[test]
212    fn test_broadcast_channel_role() {
213        assert_eq!(BroadcastChannel::Datanode.role(), Role::Datanode);
214        assert_eq!(BroadcastChannel::Frontend.role(), Role::Frontend);
215        assert_eq!(BroadcastChannel::Flownode.role(), Role::Flownode);
216    }
217
218    #[tokio::test]
219    async fn test_mailbox_receiver() {
220        let (tx, rx) = oneshot::channel();
221        let (_deregister_signal_tx, deregister_signal_rx) = watch::channel(false);
222        let receiver = MailboxReceiver::new(1, rx, deregister_signal_rx, Channel::Datanode(1));
223
224        let timestamp_millis = current_time_millis();
225        tokio::spawn(async move {
226            tx.send(Ok(MailboxMessage {
227                id: 1,
228                subject: "test-subject".to_string(),
229                from: "test-from".to_string(),
230                to: "test-to".to_string(),
231                timestamp_millis,
232                payload: None,
233                header: None,
234            }))
235        });
236
237        let result = receiver.await.unwrap();
238        assert_eq!(result.id, 1);
239        assert_eq!(result.subject, "test-subject");
240        assert_eq!(result.from, "test-from");
241        assert_eq!(result.to, "test-to");
242    }
243
244    #[tokio::test]
245    async fn test_mailbox_receiver_deregister_signal() {
246        let (_tx, rx) = oneshot::channel();
247        let (deregister_signal_tx, deregister_signal_rx) = watch::channel(false);
248        let receiver = MailboxReceiver::new(1, rx, deregister_signal_rx, Channel::Datanode(1));
249
250        // Sends the deregister signal
251        tokio::spawn(async move {
252            let _ = deregister_signal_tx.send(true);
253        });
254
255        let err = receiver.await.unwrap_err();
256        assert_matches!(err, error::Error::MailboxChannelClosed { .. });
257    }
258}