store_api/logstore/
provider.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::Display;
16use std::sync::Arc;
17
18use crate::storage::RegionId;
19
20// The Provider of kafka log store
21#[derive(Debug, Clone, PartialEq, Eq, Hash)]
22pub struct KafkaProvider {
23    pub topic: String,
24}
25
26impl KafkaProvider {
27    pub fn new(topic: String) -> Self {
28        Self { topic }
29    }
30
31    /// Returns the type name.
32    pub fn type_name() -> &'static str {
33        "KafkaProvider"
34    }
35}
36
37impl Display for KafkaProvider {
38    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39        write!(f, "{}", self.topic)
40    }
41}
42
43// The Provider of raft engine log store
44#[derive(Debug, Clone, Copy, PartialEq, Eq)]
45pub struct RaftEngineProvider {
46    pub id: u64,
47}
48
49impl RaftEngineProvider {
50    pub fn new(id: u64) -> Self {
51        Self { id }
52    }
53
54    /// Returns the type name.
55    pub fn type_name() -> &'static str {
56        "RaftEngineProvider"
57    }
58}
59
60/// The Provider of LogStore
61#[derive(Debug, Clone, PartialEq, Eq)]
62pub enum Provider {
63    RaftEngine(RaftEngineProvider),
64    Kafka(Arc<KafkaProvider>),
65    Noop,
66}
67
68impl Display for Provider {
69    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
70        match &self {
71            Provider::RaftEngine(provider) => {
72                write!(f, "region: {}", RegionId::from_u64(provider.id))
73            }
74            Provider::Kafka(provider) => write!(f, "topic: {}", provider.topic),
75            Provider::Noop => write!(f, "noop"),
76        }
77    }
78}
79
80impl Provider {
81    pub fn raft_engine_provider(id: u64) -> Provider {
82        Provider::RaftEngine(RaftEngineProvider { id })
83    }
84
85    pub fn kafka_provider(topic: String) -> Provider {
86        Provider::Kafka(Arc::new(KafkaProvider { topic }))
87    }
88
89    pub fn noop_provider() -> Provider {
90        Provider::Noop
91    }
92
93    /// Returns true if it's remote WAL.
94    pub fn is_remote_wal(&self) -> bool {
95        matches!(self, Provider::Kafka(_))
96    }
97
98    /// Returns the type name.
99    pub fn type_name(&self) -> &'static str {
100        match self {
101            Provider::RaftEngine(_) => RaftEngineProvider::type_name(),
102            Provider::Kafka(_) => KafkaProvider::type_name(),
103            Provider::Noop => "Noop",
104        }
105    }
106
107    /// Returns the reference of [`RaftEngineProvider`] if it's the type of [`LogStoreProvider::RaftEngine`].
108    pub fn as_raft_engine_provider(&self) -> Option<&RaftEngineProvider> {
109        if let Provider::RaftEngine(ns) = self {
110            return Some(ns);
111        }
112        None
113    }
114
115    /// Returns the reference of [`KafkaProvider`] if it's the type of [`LogStoreProvider::Kafka`].
116    pub fn as_kafka_provider(&self) -> Option<&Arc<KafkaProvider>> {
117        if let Provider::Kafka(ns) = self {
118            return Some(ns);
119        }
120        None
121    }
122}