1use std::fmt::{Display, Formatter};
16use std::pin::Pin;
17use std::sync::Arc;
18use std::task::{Context, Poll};
19use std::time::Duration;
20
21use api::v1::meta::{MailboxMessage, Role};
22use futures::{Future, FutureExt};
23use tokio::sync::oneshot;
24
25use crate::error::{self, Result};
26use crate::handler::{DeregisterSignalReceiver, PusherId};
27
28pub type MailboxRef = Arc<dyn Mailbox>;
29
30pub type MessageId = u64;
31
32#[derive(Debug, PartialEq, Clone, Copy)]
33pub enum Channel {
34 Datanode(u64),
35 Frontend(u64),
36 Flownode(u64),
37}
38
39impl Display for Channel {
40 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
41 match self {
42 Channel::Datanode(id) => {
43 write!(f, "Datanode-{}", id)
44 }
45 Channel::Frontend(id) => {
46 write!(f, "Frontend-{}", id)
47 }
48 Channel::Flownode(id) => {
49 write!(f, "Flownode-{}", id)
50 }
51 }
52 }
53}
54
55impl Channel {
56 pub(crate) fn pusher_id(&self) -> PusherId {
57 match self {
58 Channel::Datanode(id) => PusherId::new(Role::Datanode, *id),
59 Channel::Frontend(id) => PusherId::new(Role::Frontend, *id),
60 Channel::Flownode(id) => PusherId::new(Role::Flownode, *id),
61 }
62 }
63}
64pub enum BroadcastChannel {
65 Datanode,
66 Frontend,
67 Flownode,
68}
69
70impl BroadcastChannel {
71 pub(crate) fn role(&self) -> Role {
72 match self {
73 BroadcastChannel::Datanode => Role::Datanode,
74 BroadcastChannel::Frontend => Role::Frontend,
75 BroadcastChannel::Flownode => Role::Flownode,
76 }
77 }
78}
79
80pub enum MailboxReceiver {
82 Init {
83 message_id: MessageId,
84 ch: Channel,
85 rx: Option<oneshot::Receiver<Result<MailboxMessage>>>,
87 pusher_deregister_signal_receiver: Option<DeregisterSignalReceiver>,
89 },
90 Polling {
91 message_id: MessageId,
92 ch: Channel,
93 inner_future: Pin<Box<dyn Future<Output = Result<MailboxMessage>> + Send + 'static>>,
94 },
95}
96
97impl MailboxReceiver {
98 pub fn new(
99 message_id: MessageId,
100 rx: oneshot::Receiver<Result<MailboxMessage>>,
101 pusher_deregister_signal_receiver: DeregisterSignalReceiver,
102 ch: Channel,
103 ) -> Self {
104 Self::Init {
105 message_id,
106 rx: Some(rx),
107 pusher_deregister_signal_receiver: Some(pusher_deregister_signal_receiver),
108 ch,
109 }
110 }
111
112 pub fn message_id(&self) -> MessageId {
114 match self {
115 MailboxReceiver::Init { message_id, .. } => *message_id,
116 MailboxReceiver::Polling { message_id, .. } => *message_id,
117 }
118 }
119
120 pub fn channel(&self) -> Channel {
122 match self {
123 MailboxReceiver::Init { ch, .. } => *ch,
124 MailboxReceiver::Polling { ch, .. } => *ch,
125 }
126 }
127
128 async fn wait_for_message(
129 rx: oneshot::Receiver<Result<MailboxMessage>>,
130 mut pusher_deregister_signal_receiver: DeregisterSignalReceiver,
131 channel: Channel,
132 message_id: MessageId,
133 ) -> Result<MailboxMessage> {
134 tokio::select! {
135 res = rx => {
136 res.map_err(|e| error::MailboxReceiverSnafu {
137 id: message_id,
138 err_msg: e.to_string(),
139 }.build())?
140 }
141 _ = pusher_deregister_signal_receiver.changed() => {
142 Err(error::MailboxChannelClosedSnafu {
143 channel,
144 }.build())
145 }
146 }
147 }
148}
149
150impl Future for MailboxReceiver {
151 type Output = Result<MailboxMessage>;
152
153 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
154 match &mut *self {
155 MailboxReceiver::Init {
156 message_id,
157 ch,
158 rx,
159 pusher_deregister_signal_receiver,
160 } => {
161 let polling = MailboxReceiver::Polling {
162 message_id: *message_id,
163 ch: *ch,
164 inner_future: Self::wait_for_message(
165 rx.take().expect("rx already taken"),
166 pusher_deregister_signal_receiver
167 .take()
168 .expect("pusher_deregister_signal_receiver already taken"),
169 *ch,
170 *message_id,
171 )
172 .boxed(),
173 };
174
175 *self = polling;
176 self.poll(cx)
177 }
178 MailboxReceiver::Polling { inner_future, .. } => {
179 let result = futures::ready!(inner_future.as_mut().poll(cx));
180 Poll::Ready(result)
181 }
182 }
183 }
184}
185
186#[async_trait::async_trait]
187pub trait Mailbox: Send + Sync {
188 async fn send(
189 &self,
190 ch: &Channel,
191 msg: MailboxMessage,
192 timeout: Duration,
193 ) -> Result<MailboxReceiver>;
194
195 async fn send_oneway(&self, ch: &Channel, msg: MailboxMessage) -> Result<()>;
196
197 async fn broadcast(&self, ch: &BroadcastChannel, msg: &MailboxMessage) -> Result<()>;
198
199 async fn on_recv(&self, id: MessageId, maybe_msg: Result<MailboxMessage>) -> Result<()>;
200}
201
202#[cfg(test)]
203mod tests {
204 use std::assert_matches;
205
206 use common_time::util::current_time_millis;
207 use tokio::sync::watch;
208
209 use super::*;
210
211 #[test]
212 fn test_broadcast_channel_role() {
213 assert_eq!(BroadcastChannel::Datanode.role(), Role::Datanode);
214 assert_eq!(BroadcastChannel::Frontend.role(), Role::Frontend);
215 assert_eq!(BroadcastChannel::Flownode.role(), Role::Flownode);
216 }
217
218 #[tokio::test]
219 async fn test_mailbox_receiver() {
220 let (tx, rx) = oneshot::channel();
221 let (_deregister_signal_tx, deregister_signal_rx) = watch::channel(false);
222 let receiver = MailboxReceiver::new(1, rx, deregister_signal_rx, Channel::Datanode(1));
223
224 let timestamp_millis = current_time_millis();
225 tokio::spawn(async move {
226 tx.send(Ok(MailboxMessage {
227 id: 1,
228 subject: "test-subject".to_string(),
229 from: "test-from".to_string(),
230 to: "test-to".to_string(),
231 timestamp_millis,
232 payload: None,
233 header: None,
234 }))
235 });
236
237 let result = receiver.await.unwrap();
238 assert_eq!(result.id, 1);
239 assert_eq!(result.subject, "test-subject");
240 assert_eq!(result.from, "test-from");
241 assert_eq!(result.to, "test-to");
242 }
243
244 #[tokio::test]
245 async fn test_mailbox_receiver_deregister_signal() {
246 let (_tx, rx) = oneshot::channel();
247 let (deregister_signal_tx, deregister_signal_rx) = watch::channel(false);
248 let receiver = MailboxReceiver::new(1, rx, deregister_signal_rx, Channel::Datanode(1));
249
250 tokio::spawn(async move {
252 let _ = deregister_signal_tx.send(true);
253 });
254
255 let err = receiver.await.unwrap_err();
256 assert_matches!(err, error::Error::MailboxChannelClosed { .. });
257 }
258}