meta_srv/pubsub/
subscriber.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use snafu::ResultExt;
18use tokio::sync::mpsc::Sender;
19
20use crate::error::{self, Result};
21use crate::pubsub::Message;
22
23#[derive(Debug)]
24pub struct Subscriber<T> {
25    /// Subscriber's id, globally unique, assigned by leader meta.
26    id: u32,
27    /// Subscriber's name, passed in by subscriber.
28    name: String,
29    /// Transport channel from meta to subscriber.
30    transporter: T,
31}
32
33pub type SubscriberRef<T> = Arc<Subscriber<T>>;
34
35impl<T> Subscriber<T> {
36    pub fn new(id: u32, name: impl Into<String>, transporter: T) -> Self {
37        let name = name.into();
38
39        Self {
40            id,
41            name,
42            transporter,
43        }
44    }
45
46    pub fn id(&self) -> u32 {
47        self.id
48    }
49
50    pub fn name(&self) -> &str {
51        &self.name
52    }
53}
54
55impl<T> Subscriber<T>
56where
57    T: Transport,
58{
59    pub async fn transport_msg(&self, message: Message) -> Result<()> {
60        self.transporter.transport_msg(message).await
61    }
62}
63
64/// This trait defines how messages are delivered from meta to the subscriber.
65#[async_trait::async_trait]
66pub trait Transport: Send + Sync {
67    async fn transport_msg(&self, msg: Message) -> Result<()>;
68}
69
70#[async_trait::async_trait]
71impl Transport for Sender<Message> {
72    async fn transport_msg(&self, msg: Message) -> Result<()> {
73        self.send(msg).await.context(error::PublishMessageSnafu)
74    }
75}