meta_srv/service/
mailbox.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::{Display, Formatter};
16use std::ops::Range;
17use std::pin::Pin;
18use std::sync::Arc;
19use std::task::{Context, Poll};
20use std::time::Duration;
21
22use api::v1::meta::{MailboxMessage, Role};
23use futures::{Future, FutureExt};
24use tokio::sync::oneshot;
25
26use crate::error::{self, Result};
27use crate::handler::{DeregisterSignalReceiver, PusherId};
28
29pub type MailboxRef = Arc<dyn Mailbox>;
30
31pub type MessageId = u64;
32
33#[derive(Debug, PartialEq, Clone, Copy)]
34pub enum Channel {
35    Datanode(u64),
36    Frontend(u64),
37    Flownode(u64),
38}
39
40impl Display for Channel {
41    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
42        match self {
43            Channel::Datanode(id) => {
44                write!(f, "Datanode-{}", id)
45            }
46            Channel::Frontend(id) => {
47                write!(f, "Frontend-{}", id)
48            }
49            Channel::Flownode(id) => {
50                write!(f, "Flownode-{}", id)
51            }
52        }
53    }
54}
55
56impl Channel {
57    pub(crate) fn pusher_id(&self) -> PusherId {
58        match self {
59            Channel::Datanode(id) => PusherId::new(Role::Datanode, *id),
60            Channel::Frontend(id) => PusherId::new(Role::Frontend, *id),
61            Channel::Flownode(id) => PusherId::new(Role::Flownode, *id),
62        }
63    }
64}
65pub enum BroadcastChannel {
66    Datanode,
67    Frontend,
68    Flownode,
69}
70
71impl BroadcastChannel {
72    pub(crate) fn pusher_range(&self) -> Range<String> {
73        match self {
74            BroadcastChannel::Datanode => Range {
75                start: format!("{}-", Role::Datanode as i32),
76                end: format!("{}-", Role::Frontend as i32),
77            },
78            BroadcastChannel::Frontend => Range {
79                start: format!("{}-", Role::Frontend as i32),
80                end: format!("{}-", Role::Flownode as i32),
81            },
82            BroadcastChannel::Flownode => Range {
83                start: format!("{}-", Role::Flownode as i32),
84                end: format!("{}-", Role::Flownode as i32 + 1),
85            },
86        }
87    }
88}
89
90/// The mailbox receiver
91pub enum MailboxReceiver {
92    Init {
93        message_id: MessageId,
94        ch: Channel,
95        /// The [`MailboxMessage`] receiver
96        rx: Option<oneshot::Receiver<Result<MailboxMessage>>>,
97        /// The pusher deregister signal receiver
98        pusher_deregister_signal_receiver: Option<DeregisterSignalReceiver>,
99    },
100    Polling {
101        message_id: MessageId,
102        ch: Channel,
103        inner_future: Pin<Box<dyn Future<Output = Result<MailboxMessage>> + Send + 'static>>,
104    },
105}
106
107impl MailboxReceiver {
108    pub fn new(
109        message_id: MessageId,
110        rx: oneshot::Receiver<Result<MailboxMessage>>,
111        pusher_deregister_signal_receiver: DeregisterSignalReceiver,
112        ch: Channel,
113    ) -> Self {
114        Self::Init {
115            message_id,
116            rx: Some(rx),
117            pusher_deregister_signal_receiver: Some(pusher_deregister_signal_receiver),
118            ch,
119        }
120    }
121
122    /// Get the message id of the mailbox receiver
123    pub fn message_id(&self) -> MessageId {
124        match self {
125            MailboxReceiver::Init { message_id, .. } => *message_id,
126            MailboxReceiver::Polling { message_id, .. } => *message_id,
127        }
128    }
129
130    /// Get the channel of the mailbox receiver
131    pub fn channel(&self) -> Channel {
132        match self {
133            MailboxReceiver::Init { ch, .. } => *ch,
134            MailboxReceiver::Polling { ch, .. } => *ch,
135        }
136    }
137
138    async fn wait_for_message(
139        rx: oneshot::Receiver<Result<MailboxMessage>>,
140        mut pusher_deregister_signal_receiver: DeregisterSignalReceiver,
141        channel: Channel,
142        message_id: MessageId,
143    ) -> Result<MailboxMessage> {
144        tokio::select! {
145            res = rx => {
146                res.map_err(|e| error::MailboxReceiverSnafu {
147                    id: message_id,
148                    err_msg: e.to_string(),
149                }.build())?
150            }
151            _ = pusher_deregister_signal_receiver.changed() => {
152                 Err(error::MailboxChannelClosedSnafu {
153                    channel,
154                }.build())
155            }
156        }
157    }
158}
159
160impl Future for MailboxReceiver {
161    type Output = Result<MailboxMessage>;
162
163    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
164        match &mut *self {
165            MailboxReceiver::Init {
166                message_id,
167                ch,
168                rx,
169                pusher_deregister_signal_receiver,
170            } => {
171                let polling = MailboxReceiver::Polling {
172                    message_id: *message_id,
173                    ch: *ch,
174                    inner_future: Self::wait_for_message(
175                        rx.take().expect("rx already taken"),
176                        pusher_deregister_signal_receiver
177                            .take()
178                            .expect("pusher_deregister_signal_receiver already taken"),
179                        *ch,
180                        *message_id,
181                    )
182                    .boxed(),
183                };
184
185                *self = polling;
186                self.poll(cx)
187            }
188            MailboxReceiver::Polling { inner_future, .. } => {
189                let result = futures::ready!(inner_future.as_mut().poll(cx));
190                Poll::Ready(result)
191            }
192        }
193    }
194}
195
196#[async_trait::async_trait]
197pub trait Mailbox: Send + Sync {
198    async fn send(
199        &self,
200        ch: &Channel,
201        msg: MailboxMessage,
202        timeout: Duration,
203    ) -> Result<MailboxReceiver>;
204
205    async fn send_oneway(&self, ch: &Channel, msg: MailboxMessage) -> Result<()>;
206
207    async fn broadcast(&self, ch: &BroadcastChannel, msg: &MailboxMessage) -> Result<()>;
208
209    async fn on_recv(&self, id: MessageId, maybe_msg: Result<MailboxMessage>) -> Result<()>;
210}
211
212#[cfg(test)]
213mod tests {
214    use std::assert_matches::assert_matches;
215
216    use common_time::util::current_time_millis;
217    use tokio::sync::watch;
218
219    use super::*;
220
221    #[test]
222    fn test_channel_pusher_range() {
223        assert_eq!(
224            BroadcastChannel::Datanode.pusher_range(),
225            ("0-".to_string().."1-".to_string())
226        );
227        assert_eq!(
228            BroadcastChannel::Frontend.pusher_range(),
229            ("1-".to_string().."2-".to_string())
230        );
231        assert_eq!(
232            BroadcastChannel::Flownode.pusher_range(),
233            ("2-".to_string().."3-".to_string())
234        );
235    }
236
237    #[tokio::test]
238    async fn test_mailbox_receiver() {
239        let (tx, rx) = oneshot::channel();
240        let (_deregister_signal_tx, deregister_signal_rx) = watch::channel(false);
241        let receiver = MailboxReceiver::new(1, rx, deregister_signal_rx, Channel::Datanode(1));
242
243        let timestamp_millis = current_time_millis();
244        tokio::spawn(async move {
245            tx.send(Ok(MailboxMessage {
246                id: 1,
247                subject: "test-subject".to_string(),
248                from: "test-from".to_string(),
249                to: "test-to".to_string(),
250                timestamp_millis,
251                payload: None,
252            }))
253        });
254
255        let result = receiver.await.unwrap();
256        assert_eq!(result.id, 1);
257        assert_eq!(result.subject, "test-subject");
258        assert_eq!(result.from, "test-from");
259        assert_eq!(result.to, "test-to");
260    }
261
262    #[tokio::test]
263    async fn test_mailbox_receiver_deregister_signal() {
264        let (_tx, rx) = oneshot::channel();
265        let (deregister_signal_tx, deregister_signal_rx) = watch::channel(false);
266        let receiver = MailboxReceiver::new(1, rx, deregister_signal_rx, Channel::Datanode(1));
267
268        // Sends the deregister signal
269        tokio::spawn(async move {
270            let _ = deregister_signal_tx.send(true);
271        });
272
273        let err = receiver.await.unwrap_err();
274        assert_matches!(err, error::Error::MailboxChannelClosed { .. });
275    }
276}