1use std::fmt::{Display, Formatter};
16use std::ops::Range;
17use std::pin::Pin;
18use std::sync::Arc;
19use std::task::{Context, Poll};
20use std::time::Duration;
21
22use api::v1::meta::{MailboxMessage, Role};
23use futures::{Future, FutureExt};
24use tokio::sync::oneshot;
25
26use crate::error::{self, Result};
27use crate::handler::{DeregisterSignalReceiver, PusherId};
28
29pub type MailboxRef = Arc<dyn Mailbox>;
30
31pub type MessageId = u64;
32
33#[derive(Debug, PartialEq, Clone, Copy)]
34pub enum Channel {
35 Datanode(u64),
36 Frontend(u64),
37 Flownode(u64),
38}
39
40impl Display for Channel {
41 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
42 match self {
43 Channel::Datanode(id) => {
44 write!(f, "Datanode-{}", id)
45 }
46 Channel::Frontend(id) => {
47 write!(f, "Frontend-{}", id)
48 }
49 Channel::Flownode(id) => {
50 write!(f, "Flownode-{}", id)
51 }
52 }
53 }
54}
55
56impl Channel {
57 pub(crate) fn pusher_id(&self) -> PusherId {
58 match self {
59 Channel::Datanode(id) => PusherId::new(Role::Datanode, *id),
60 Channel::Frontend(id) => PusherId::new(Role::Frontend, *id),
61 Channel::Flownode(id) => PusherId::new(Role::Flownode, *id),
62 }
63 }
64}
65pub enum BroadcastChannel {
66 Datanode,
67 Frontend,
68 Flownode,
69}
70
71impl BroadcastChannel {
72 pub(crate) fn pusher_range(&self) -> Range<String> {
73 match self {
74 BroadcastChannel::Datanode => Range {
75 start: format!("{}-", Role::Datanode as i32),
76 end: format!("{}-", Role::Frontend as i32),
77 },
78 BroadcastChannel::Frontend => Range {
79 start: format!("{}-", Role::Frontend as i32),
80 end: format!("{}-", Role::Flownode as i32),
81 },
82 BroadcastChannel::Flownode => Range {
83 start: format!("{}-", Role::Flownode as i32),
84 end: format!("{}-", Role::Flownode as i32 + 1),
85 },
86 }
87 }
88}
89
90pub enum MailboxReceiver {
92 Init {
93 message_id: MessageId,
94 ch: Channel,
95 rx: Option<oneshot::Receiver<Result<MailboxMessage>>>,
97 pusher_deregister_signal_receiver: Option<DeregisterSignalReceiver>,
99 },
100 Polling {
101 message_id: MessageId,
102 ch: Channel,
103 inner_future: Pin<Box<dyn Future<Output = Result<MailboxMessage>> + Send + 'static>>,
104 },
105}
106
107impl MailboxReceiver {
108 pub fn new(
109 message_id: MessageId,
110 rx: oneshot::Receiver<Result<MailboxMessage>>,
111 pusher_deregister_signal_receiver: DeregisterSignalReceiver,
112 ch: Channel,
113 ) -> Self {
114 Self::Init {
115 message_id,
116 rx: Some(rx),
117 pusher_deregister_signal_receiver: Some(pusher_deregister_signal_receiver),
118 ch,
119 }
120 }
121
122 pub fn message_id(&self) -> MessageId {
124 match self {
125 MailboxReceiver::Init { message_id, .. } => *message_id,
126 MailboxReceiver::Polling { message_id, .. } => *message_id,
127 }
128 }
129
130 pub fn channel(&self) -> Channel {
132 match self {
133 MailboxReceiver::Init { ch, .. } => *ch,
134 MailboxReceiver::Polling { ch, .. } => *ch,
135 }
136 }
137
138 async fn wait_for_message(
139 rx: oneshot::Receiver<Result<MailboxMessage>>,
140 mut pusher_deregister_signal_receiver: DeregisterSignalReceiver,
141 channel: Channel,
142 message_id: MessageId,
143 ) -> Result<MailboxMessage> {
144 tokio::select! {
145 res = rx => {
146 res.map_err(|e| error::MailboxReceiverSnafu {
147 id: message_id,
148 err_msg: e.to_string(),
149 }.build())?
150 }
151 _ = pusher_deregister_signal_receiver.changed() => {
152 Err(error::MailboxChannelClosedSnafu {
153 channel,
154 }.build())
155 }
156 }
157 }
158}
159
160impl Future for MailboxReceiver {
161 type Output = Result<MailboxMessage>;
162
163 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
164 match &mut *self {
165 MailboxReceiver::Init {
166 message_id,
167 ch,
168 rx,
169 pusher_deregister_signal_receiver,
170 } => {
171 let polling = MailboxReceiver::Polling {
172 message_id: *message_id,
173 ch: *ch,
174 inner_future: Self::wait_for_message(
175 rx.take().expect("rx already taken"),
176 pusher_deregister_signal_receiver
177 .take()
178 .expect("pusher_deregister_signal_receiver already taken"),
179 *ch,
180 *message_id,
181 )
182 .boxed(),
183 };
184
185 *self = polling;
186 self.poll(cx)
187 }
188 MailboxReceiver::Polling { inner_future, .. } => {
189 let result = futures::ready!(inner_future.as_mut().poll(cx));
190 Poll::Ready(result)
191 }
192 }
193 }
194}
195
196#[async_trait::async_trait]
197pub trait Mailbox: Send + Sync {
198 async fn send(
199 &self,
200 ch: &Channel,
201 msg: MailboxMessage,
202 timeout: Duration,
203 ) -> Result<MailboxReceiver>;
204
205 async fn send_oneway(&self, ch: &Channel, msg: MailboxMessage) -> Result<()>;
206
207 async fn broadcast(&self, ch: &BroadcastChannel, msg: &MailboxMessage) -> Result<()>;
208
209 async fn on_recv(&self, id: MessageId, maybe_msg: Result<MailboxMessage>) -> Result<()>;
210}
211
212#[cfg(test)]
213mod tests {
214 use std::assert_matches::assert_matches;
215
216 use common_time::util::current_time_millis;
217 use tokio::sync::watch;
218
219 use super::*;
220
221 #[test]
222 fn test_channel_pusher_range() {
223 assert_eq!(
224 BroadcastChannel::Datanode.pusher_range(),
225 ("0-".to_string().."1-".to_string())
226 );
227 assert_eq!(
228 BroadcastChannel::Frontend.pusher_range(),
229 ("1-".to_string().."2-".to_string())
230 );
231 assert_eq!(
232 BroadcastChannel::Flownode.pusher_range(),
233 ("2-".to_string().."3-".to_string())
234 );
235 }
236
237 #[tokio::test]
238 async fn test_mailbox_receiver() {
239 let (tx, rx) = oneshot::channel();
240 let (_deregister_signal_tx, deregister_signal_rx) = watch::channel(false);
241 let receiver = MailboxReceiver::new(1, rx, deregister_signal_rx, Channel::Datanode(1));
242
243 let timestamp_millis = current_time_millis();
244 tokio::spawn(async move {
245 tx.send(Ok(MailboxMessage {
246 id: 1,
247 subject: "test-subject".to_string(),
248 from: "test-from".to_string(),
249 to: "test-to".to_string(),
250 timestamp_millis,
251 payload: None,
252 }))
253 });
254
255 let result = receiver.await.unwrap();
256 assert_eq!(result.id, 1);
257 assert_eq!(result.subject, "test-subject");
258 assert_eq!(result.from, "test-from");
259 assert_eq!(result.to, "test-to");
260 }
261
262 #[tokio::test]
263 async fn test_mailbox_receiver_deregister_signal() {
264 let (_tx, rx) = oneshot::channel();
265 let (deregister_signal_tx, deregister_signal_rx) = watch::channel(false);
266 let receiver = MailboxReceiver::new(1, rx, deregister_signal_rx, Channel::Datanode(1));
267
268 tokio::spawn(async move {
270 let _ = deregister_signal_tx.send(true);
271 });
272
273 let err = receiver.await.unwrap_err();
274 assert_matches!(err, error::Error::MailboxChannelClosed { .. });
275 }
276}