store_api/logstore/
provider.rs1use std::fmt::Display;
16use std::sync::Arc;
17
18use crate::logstore::LogStore;
19use crate::storage::RegionId;
20
21#[derive(Debug, Clone, PartialEq, Eq, Hash)]
23pub struct KafkaProvider {
24 pub topic: String,
25}
26
27impl KafkaProvider {
28 pub fn new(topic: String) -> Self {
29 Self { topic }
30 }
31
32 pub fn type_name() -> &'static str {
34 "KafkaProvider"
35 }
36}
37
38impl Display for KafkaProvider {
39 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
40 write!(f, "{}", self.topic)
41 }
42}
43
44#[derive(Debug, Clone, Copy, PartialEq, Eq)]
46pub struct RaftEngineProvider {
47 pub id: u64,
48}
49
50impl RaftEngineProvider {
51 pub fn new(id: u64) -> Self {
52 Self { id }
53 }
54
55 pub fn type_name() -> &'static str {
57 "RaftEngineProvider"
58 }
59}
60
61#[derive(Debug, Clone, PartialEq, Eq)]
63pub enum Provider {
64 RaftEngine(RaftEngineProvider),
65 Kafka(Arc<KafkaProvider>),
66 Noop,
67}
68
69impl Display for Provider {
70 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
71 match &self {
72 Provider::RaftEngine(provider) => {
73 write!(f, "RaftEngine(region={})", RegionId::from_u64(provider.id))
74 }
75 Provider::Kafka(provider) => write!(f, "Kafka(topic={})", provider.topic),
76 Provider::Noop => write!(f, "Noop"),
77 }
78 }
79}
80
81impl Provider {
82 pub fn initial_flushed_entry_id<S: LogStore>(&self, wal: &S) -> u64 {
88 if matches!(self, Provider::Kafka(_)) {
89 return wal.latest_entry_id(self).unwrap_or(0);
90 }
91 0
92 }
93
94 pub fn raft_engine_provider(id: u64) -> Provider {
95 Provider::RaftEngine(RaftEngineProvider { id })
96 }
97
98 pub fn kafka_provider(topic: String) -> Provider {
99 Provider::Kafka(Arc::new(KafkaProvider { topic }))
100 }
101
102 pub fn noop_provider() -> Provider {
103 Provider::Noop
104 }
105
106 pub fn is_remote_wal(&self) -> bool {
108 matches!(self, Provider::Kafka(_))
109 }
110
111 pub fn type_name(&self) -> &'static str {
113 match self {
114 Provider::RaftEngine(_) => RaftEngineProvider::type_name(),
115 Provider::Kafka(_) => KafkaProvider::type_name(),
116 Provider::Noop => "Noop",
117 }
118 }
119
120 pub fn as_raft_engine_provider(&self) -> Option<&RaftEngineProvider> {
122 if let Provider::RaftEngine(ns) = self {
123 return Some(ns);
124 }
125 None
126 }
127
128 pub fn as_kafka_provider(&self) -> Option<&Arc<KafkaProvider>> {
130 if let Provider::Kafka(ns) = self {
131 return Some(ns);
132 }
133 None
134 }
135}