meta_srv/pubsub/
publish.rs1use std::fmt::Debug;
16use std::marker::PhantomData;
17use std::sync::Arc;
18
19use common_telemetry::error;
20
21use crate::pubsub::{Message, SubscriptionManager, Transport, UnsubscribeRequest};
22
23#[async_trait::async_trait]
26pub trait Publisher: Send + Sync {
27 async fn publish(&self, message: Message);
28}
29
30pub type PublisherRef = Arc<dyn Publisher>;
31
32pub struct DefaultPublisher<M, T> {
34 subscription_manager: Arc<M>,
35 _transport: PhantomData<T>,
36}
37
38impl<M, T> DefaultPublisher<M, T> {
39 pub fn new(subscription_manager: Arc<M>) -> Self {
40 Self {
41 subscription_manager,
42 _transport: PhantomData,
43 }
44 }
45}
46
47#[async_trait::async_trait]
48impl<M, T> Publisher for DefaultPublisher<M, T>
49where
50 M: SubscriptionManager<T>,
51 T: Transport + Debug,
52{
53 async fn publish(&self, message: Message) {
54 let subscribers = self
55 .subscription_manager
56 .subscribers_by_topic(&message.topic());
57
58 for subscriber in subscribers {
59 if subscriber.transport_msg(message.clone()).await.is_err() {
60 let req = UnsubscribeRequest {
63 subscriber_id: subscriber.id(),
64 };
65
66 if let Err(e) = self.subscription_manager.unsubscribe(req.clone()) {
67 error!(e; "failed to unsubscribe, req: {:?}", req);
68 }
69 }
70 }
71 }
72}