1pub mod kafka;
16pub mod raft_engine;
17
18use std::time::Duration;
19
20use serde::{Deserialize, Serialize};
21
22use crate::config::kafka::common::{
23 DEFAULT_AUTO_PRUNE_INTERVAL, DEFAULT_AUTO_PRUNE_PARALLELISM, DEFAULT_CHECKPOINT_TRIGGER_SIZE,
24 DEFAULT_FLUSH_TRIGGER_SIZE,
25};
26use crate::config::kafka::{DatanodeKafkaConfig, MetasrvKafkaConfig};
27use crate::config::raft_engine::RaftEngineConfig;
28use crate::error::{Error, UnsupportedWalProviderSnafu};
29
30#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Default)]
32#[serde(tag = "provider", rename_all = "snake_case")]
33#[allow(clippy::large_enum_variant)]
34pub enum MetasrvWalConfig {
35 #[default]
36 RaftEngine,
37 Kafka(MetasrvKafkaConfig),
38}
39
40#[allow(clippy::large_enum_variant)]
41#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
43#[serde(tag = "provider", rename_all = "snake_case")]
44pub enum DatanodeWalConfig {
45 RaftEngine(RaftEngineConfig),
46 Kafka(DatanodeKafkaConfig),
47 Noop,
48}
49
50impl Default for DatanodeWalConfig {
51 fn default() -> Self {
52 Self::RaftEngine(RaftEngineConfig::default())
53 }
54}
55
56impl TryFrom<DatanodeWalConfig> for MetasrvWalConfig {
57 type Error = Error;
58
59 fn try_from(config: DatanodeWalConfig) -> Result<Self, Self::Error> {
60 match config {
61 DatanodeWalConfig::RaftEngine(_) => Ok(Self::RaftEngine),
62 DatanodeWalConfig::Kafka(config) => Ok(Self::Kafka(MetasrvKafkaConfig {
63 connection: config.connection,
64 kafka_topic: config.kafka_topic,
65 auto_create_topics: config.auto_create_topics,
66 auto_prune_interval: DEFAULT_AUTO_PRUNE_INTERVAL,
68 auto_prune_parallelism: DEFAULT_AUTO_PRUNE_PARALLELISM,
70 flush_trigger_size: DEFAULT_FLUSH_TRIGGER_SIZE,
72 checkpoint_trigger_size: DEFAULT_CHECKPOINT_TRIGGER_SIZE,
74 })),
75 DatanodeWalConfig::Noop => UnsupportedWalProviderSnafu {
76 provider: "noop".to_string(),
77 }
78 .fail(),
79 }
80 }
81}
82
83impl MetasrvWalConfig {
84 pub fn enable_active_wal_pruning(&self) -> bool {
86 match self {
87 MetasrvWalConfig::RaftEngine => false,
88 MetasrvWalConfig::Kafka(config) => config.auto_prune_interval > Duration::ZERO,
89 }
90 }
91
92 pub fn remote_wal_options(&self) -> Option<&MetasrvKafkaConfig> {
94 match self {
95 MetasrvWalConfig::RaftEngine => None,
96 MetasrvWalConfig::Kafka(config) => Some(config),
97 }
98 }
99}
100
101impl From<MetasrvWalConfig> for DatanodeWalConfig {
102 fn from(config: MetasrvWalConfig) -> Self {
103 match config {
104 MetasrvWalConfig::RaftEngine => Self::RaftEngine(RaftEngineConfig::default()),
105 MetasrvWalConfig::Kafka(config) => Self::Kafka(DatanodeKafkaConfig {
106 connection: config.connection,
107 kafka_topic: config.kafka_topic,
108 ..Default::default()
109 }),
110 }
111 }
112}
113
114#[cfg(test)]
115mod tests {
116 use std::time::Duration;
117
118 use common_base::readable_size::ReadableSize;
119 use kafka::common::{
120 KafkaClientSasl, KafkaClientSaslConfig, KafkaClientTls, KafkaConnectionConfig,
121 };
122 use tests::kafka::common::KafkaTopicConfig;
123
124 use super::*;
125 use crate::TopicSelectorType;
126 use crate::config::{DatanodeKafkaConfig, MetasrvKafkaConfig};
127
128 #[test]
129 fn test_toml_raft_engine() {
130 let toml_str = r#"
132 provider = "raft_engine"
133 "#;
134 let metasrv_wal_config: MetasrvWalConfig = toml::from_str(toml_str).unwrap();
135 assert_eq!(metasrv_wal_config, MetasrvWalConfig::RaftEngine);
136
137 let datanode_wal_config: DatanodeWalConfig = toml::from_str(toml_str).unwrap();
138 assert_eq!(
139 datanode_wal_config,
140 DatanodeWalConfig::RaftEngine(RaftEngineConfig::default())
141 );
142
143 let toml_str = r#"
145 provider = "raft_engine"
146 broker_endpoints = ["127.0.0.1:9092"]
147 num_topics = 32
148 "#;
149 let datanode_wal_config: DatanodeWalConfig = toml::from_str(toml_str).unwrap();
150 assert_eq!(
151 datanode_wal_config,
152 DatanodeWalConfig::RaftEngine(RaftEngineConfig::default())
153 );
154
155 let toml_str = r#"
157 provider = "raft_engine"
158 file_size = "4MB"
159 purge_threshold = "1GB"
160 purge_interval = "5mins"
161 "#;
162 let datanode_wal_config: DatanodeWalConfig = toml::from_str(toml_str).unwrap();
163 let expected = RaftEngineConfig {
164 file_size: ReadableSize::mb(4),
165 purge_threshold: ReadableSize::gb(1),
166 purge_interval: Duration::from_secs(5 * 60),
167 ..Default::default()
168 };
169 assert_eq!(datanode_wal_config, DatanodeWalConfig::RaftEngine(expected));
170 }
171
172 #[test]
173 fn test_toml_kafka() {
174 let toml_str = r#"
175 provider = "kafka"
176 broker_endpoints = ["127.0.0.1:9092"]
177 max_batch_bytes = "1MB"
178 consumer_wait_timeout = "100ms"
179 num_topics = 32
180 num_partitions = 1
181 selector_type = "round_robin"
182 replication_factor = 1
183 create_topic_timeout = "30s"
184 topic_name_prefix = "greptimedb_wal_topic"
185 [tls]
186 server_ca_cert_path = "/path/to/server.pem"
187 [sasl]
188 type = "SCRAM-SHA-512"
189 username = "hi"
190 password = "test"
191 "#;
192
193 let metasrv_wal_config: MetasrvWalConfig = toml::from_str(toml_str).unwrap();
195 let expected = MetasrvKafkaConfig {
196 connection: KafkaConnectionConfig {
197 broker_endpoints: vec!["127.0.0.1:9092".to_string()],
198 sasl: Some(KafkaClientSasl {
199 config: KafkaClientSaslConfig::ScramSha512 {
200 username: "hi".to_string(),
201 password: "test".to_string(),
202 },
203 }),
204 tls: Some(KafkaClientTls {
205 server_ca_cert_path: Some("/path/to/server.pem".to_string()),
206 client_cert_path: None,
207 client_key_path: None,
208 }),
209 },
210 kafka_topic: KafkaTopicConfig {
211 num_topics: 32,
212 selector_type: TopicSelectorType::RoundRobin,
213 topic_name_prefix: "greptimedb_wal_topic".to_string(),
214 num_partitions: 1,
215 replication_factor: 1,
216 create_topic_timeout: Duration::from_secs(30),
217 },
218 auto_create_topics: true,
219 auto_prune_interval: Duration::from_mins(30),
220 auto_prune_parallelism: 10,
221 flush_trigger_size: ReadableSize::mb(512),
222 checkpoint_trigger_size: ReadableSize::mb(128),
223 };
224 assert_eq!(metasrv_wal_config, MetasrvWalConfig::Kafka(expected));
225
226 let datanode_wal_config: DatanodeWalConfig = toml::from_str(toml_str).unwrap();
228 let expected = DatanodeKafkaConfig {
229 connection: KafkaConnectionConfig {
230 broker_endpoints: vec!["127.0.0.1:9092".to_string()],
231 sasl: Some(KafkaClientSasl {
232 config: KafkaClientSaslConfig::ScramSha512 {
233 username: "hi".to_string(),
234 password: "test".to_string(),
235 },
236 }),
237 tls: Some(KafkaClientTls {
238 server_ca_cert_path: Some("/path/to/server.pem".to_string()),
239 client_cert_path: None,
240 client_key_path: None,
241 }),
242 },
243 max_batch_bytes: ReadableSize::mb(1),
244 consumer_wait_timeout: Duration::from_millis(100),
245 kafka_topic: KafkaTopicConfig {
246 num_topics: 32,
247 selector_type: TopicSelectorType::RoundRobin,
248 topic_name_prefix: "greptimedb_wal_topic".to_string(),
249 num_partitions: 1,
250 replication_factor: 1,
251 create_topic_timeout: Duration::from_secs(30),
252 },
253 ..Default::default()
254 };
255 assert_eq!(datanode_wal_config, DatanodeWalConfig::Kafka(expected));
256 }
257}