store_api/logstore/
provider.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::Display;
16use std::sync::Arc;
17
18use crate::logstore::LogStore;
19use crate::storage::RegionId;
20
21// The Provider of kafka log store
22#[derive(Debug, Clone, PartialEq, Eq, Hash)]
23pub struct KafkaProvider {
24    pub topic: String,
25}
26
27impl KafkaProvider {
28    pub fn new(topic: String) -> Self {
29        Self { topic }
30    }
31
32    /// Returns the type name.
33    pub fn type_name() -> &'static str {
34        "KafkaProvider"
35    }
36}
37
38impl Display for KafkaProvider {
39    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
40        write!(f, "{}", self.topic)
41    }
42}
43
44// The Provider of raft engine log store
45#[derive(Debug, Clone, Copy, PartialEq, Eq)]
46pub struct RaftEngineProvider {
47    pub id: u64,
48}
49
50impl RaftEngineProvider {
51    pub fn new(id: u64) -> Self {
52        Self { id }
53    }
54
55    /// Returns the type name.
56    pub fn type_name() -> &'static str {
57        "RaftEngineProvider"
58    }
59}
60
61/// The Provider of LogStore
62#[derive(Debug, Clone, PartialEq, Eq)]
63pub enum Provider {
64    RaftEngine(RaftEngineProvider),
65    Kafka(Arc<KafkaProvider>),
66    Noop,
67}
68
69impl Display for Provider {
70    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
71        match &self {
72            Provider::RaftEngine(provider) => {
73                write!(f, "RaftEngine(region={})", RegionId::from_u64(provider.id))
74            }
75            Provider::Kafka(provider) => write!(f, "Kafka(topic={})", provider.topic),
76            Provider::Noop => write!(f, "Noop"),
77        }
78    }
79}
80
81impl Provider {
82    /// Returns the initial flushed entry id of the provider.
83    /// This is used to initialize the flushed entry id of the region when creating the region from scratch.
84    ///
85    /// Currently only used for remote WAL.
86    /// For local WAL, the initial flushed entry id is 0.
87    pub fn initial_flushed_entry_id<S: LogStore>(&self, wal: &S) -> u64 {
88        if matches!(self, Provider::Kafka(_)) {
89            return wal.latest_entry_id(self).unwrap_or(0);
90        }
91        0
92    }
93
94    pub fn raft_engine_provider(id: u64) -> Provider {
95        Provider::RaftEngine(RaftEngineProvider { id })
96    }
97
98    pub fn kafka_provider(topic: String) -> Provider {
99        Provider::Kafka(Arc::new(KafkaProvider { topic }))
100    }
101
102    pub fn noop_provider() -> Provider {
103        Provider::Noop
104    }
105
106    /// Returns true if it's remote WAL.
107    pub fn is_remote_wal(&self) -> bool {
108        matches!(self, Provider::Kafka(_))
109    }
110
111    /// Returns the type name.
112    pub fn type_name(&self) -> &'static str {
113        match self {
114            Provider::RaftEngine(_) => RaftEngineProvider::type_name(),
115            Provider::Kafka(_) => KafkaProvider::type_name(),
116            Provider::Noop => "Noop",
117        }
118    }
119
120    /// Returns the reference of [`RaftEngineProvider`] if it's the type of [`LogStoreProvider::RaftEngine`].
121    pub fn as_raft_engine_provider(&self) -> Option<&RaftEngineProvider> {
122        if let Provider::RaftEngine(ns) = self {
123            return Some(ns);
124        }
125        None
126    }
127
128    /// Returns the reference of [`KafkaProvider`] if it's the type of [`LogStoreProvider::Kafka`].
129    pub fn as_kafka_provider(&self) -> Option<&Arc<KafkaProvider>> {
130        if let Provider::Kafka(ns) = self {
131            return Some(ns);
132        }
133        None
134    }
135}