1pub mod kafka;
16pub mod raft_engine;
17
18use std::time::Duration;
19
20use serde::{Deserialize, Serialize};
21
22use crate::config::kafka::{DatanodeKafkaConfig, MetasrvKafkaConfig};
23use crate::config::raft_engine::RaftEngineConfig;
24
25#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Default)]
27#[serde(tag = "provider", rename_all = "snake_case")]
28#[allow(clippy::large_enum_variant)]
29pub enum MetasrvWalConfig {
30 #[default]
31 RaftEngine,
32 Kafka(MetasrvKafkaConfig),
33}
34
35#[allow(clippy::large_enum_variant)]
36#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
38#[serde(tag = "provider", rename_all = "snake_case")]
39pub enum DatanodeWalConfig {
40 RaftEngine(RaftEngineConfig),
41 Kafka(DatanodeKafkaConfig),
42}
43
44impl Default for DatanodeWalConfig {
45 fn default() -> Self {
46 Self::RaftEngine(RaftEngineConfig::default())
47 }
48}
49
50impl From<DatanodeWalConfig> for MetasrvWalConfig {
51 fn from(config: DatanodeWalConfig) -> Self {
52 match config {
53 DatanodeWalConfig::RaftEngine(_) => Self::RaftEngine,
54 DatanodeWalConfig::Kafka(config) => Self::Kafka(MetasrvKafkaConfig {
55 connection: config.connection,
56 kafka_topic: config.kafka_topic,
57 auto_create_topics: config.auto_create_topics,
58 auto_prune_interval: config.auto_prune_interval,
59 trigger_flush_threshold: config.trigger_flush_threshold,
60 auto_prune_parallelism: config.auto_prune_parallelism,
61 }),
62 }
63 }
64}
65
66impl MetasrvWalConfig {
67 pub fn enable_active_wal_pruning(&self) -> bool {
69 match self {
70 MetasrvWalConfig::RaftEngine => false,
71 MetasrvWalConfig::Kafka(config) => config.auto_prune_interval > Duration::ZERO,
72 }
73 }
74
75 pub fn remote_wal_options(&self) -> Option<&MetasrvKafkaConfig> {
77 match self {
78 MetasrvWalConfig::RaftEngine => None,
79 MetasrvWalConfig::Kafka(config) => Some(config),
80 }
81 }
82}
83
84impl From<MetasrvWalConfig> for DatanodeWalConfig {
85 fn from(config: MetasrvWalConfig) -> Self {
86 match config {
87 MetasrvWalConfig::RaftEngine => Self::RaftEngine(RaftEngineConfig::default()),
88 MetasrvWalConfig::Kafka(config) => Self::Kafka(DatanodeKafkaConfig {
89 connection: config.connection,
90 kafka_topic: config.kafka_topic,
91 ..Default::default()
92 }),
93 }
94 }
95}
96
97#[cfg(test)]
98mod tests {
99 use std::time::Duration;
100
101 use common_base::readable_size::ReadableSize;
102 use kafka::common::{
103 KafkaClientSasl, KafkaClientSaslConfig, KafkaClientTls, KafkaConnectionConfig,
104 };
105 use tests::kafka::common::KafkaTopicConfig;
106
107 use super::*;
108 use crate::config::{DatanodeKafkaConfig, MetasrvKafkaConfig};
109 use crate::TopicSelectorType;
110
111 #[test]
112 fn test_toml_raft_engine() {
113 let toml_str = r#"
115 provider = "raft_engine"
116 "#;
117 let metasrv_wal_config: MetasrvWalConfig = toml::from_str(toml_str).unwrap();
118 assert_eq!(metasrv_wal_config, MetasrvWalConfig::RaftEngine);
119
120 let datanode_wal_config: DatanodeWalConfig = toml::from_str(toml_str).unwrap();
121 assert_eq!(
122 datanode_wal_config,
123 DatanodeWalConfig::RaftEngine(RaftEngineConfig::default())
124 );
125
126 let toml_str = r#"
128 provider = "raft_engine"
129 broker_endpoints = ["127.0.0.1:9092"]
130 num_topics = 32
131 "#;
132 let datanode_wal_config: DatanodeWalConfig = toml::from_str(toml_str).unwrap();
133 assert_eq!(
134 datanode_wal_config,
135 DatanodeWalConfig::RaftEngine(RaftEngineConfig::default())
136 );
137
138 let toml_str = r#"
140 provider = "raft_engine"
141 file_size = "4MB"
142 purge_threshold = "1GB"
143 purge_interval = "5mins"
144 "#;
145 let datanode_wal_config: DatanodeWalConfig = toml::from_str(toml_str).unwrap();
146 let expected = RaftEngineConfig {
147 file_size: ReadableSize::mb(4),
148 purge_threshold: ReadableSize::gb(1),
149 purge_interval: Duration::from_secs(5 * 60),
150 ..Default::default()
151 };
152 assert_eq!(datanode_wal_config, DatanodeWalConfig::RaftEngine(expected));
153 }
154
155 #[test]
156 fn test_toml_kafka() {
157 let toml_str = r#"
158 provider = "kafka"
159 broker_endpoints = ["127.0.0.1:9092"]
160 max_batch_bytes = "1MB"
161 linger = "200ms"
162 consumer_wait_timeout = "100ms"
163 backoff_init = "500ms"
164 backoff_max = "10s"
165 backoff_base = 2
166 backoff_deadline = "5mins"
167 num_topics = 32
168 num_partitions = 1
169 selector_type = "round_robin"
170 replication_factor = 1
171 create_topic_timeout = "30s"
172 topic_name_prefix = "greptimedb_wal_topic"
173 [tls]
174 server_ca_cert_path = "/path/to/server.pem"
175 [sasl]
176 type = "SCRAM-SHA-512"
177 username = "hi"
178 password = "test"
179 "#;
180
181 let metasrv_wal_config: MetasrvWalConfig = toml::from_str(toml_str).unwrap();
183 let expected = MetasrvKafkaConfig {
184 connection: KafkaConnectionConfig {
185 broker_endpoints: vec!["127.0.0.1:9092".to_string()],
186 sasl: Some(KafkaClientSasl {
187 config: KafkaClientSaslConfig::ScramSha512 {
188 username: "hi".to_string(),
189 password: "test".to_string(),
190 },
191 }),
192 tls: Some(KafkaClientTls {
193 server_ca_cert_path: Some("/path/to/server.pem".to_string()),
194 client_cert_path: None,
195 client_key_path: None,
196 }),
197 },
198 kafka_topic: KafkaTopicConfig {
199 num_topics: 32,
200 selector_type: TopicSelectorType::RoundRobin,
201 topic_name_prefix: "greptimedb_wal_topic".to_string(),
202 num_partitions: 1,
203 replication_factor: 1,
204 create_topic_timeout: Duration::from_secs(30),
205 },
206 auto_create_topics: true,
207 auto_prune_interval: Duration::from_secs(0),
208 trigger_flush_threshold: 0,
209 auto_prune_parallelism: 10,
210 };
211 assert_eq!(metasrv_wal_config, MetasrvWalConfig::Kafka(expected));
212
213 let datanode_wal_config: DatanodeWalConfig = toml::from_str(toml_str).unwrap();
215 let expected = DatanodeKafkaConfig {
216 connection: KafkaConnectionConfig {
217 broker_endpoints: vec!["127.0.0.1:9092".to_string()],
218 sasl: Some(KafkaClientSasl {
219 config: KafkaClientSaslConfig::ScramSha512 {
220 username: "hi".to_string(),
221 password: "test".to_string(),
222 },
223 }),
224 tls: Some(KafkaClientTls {
225 server_ca_cert_path: Some("/path/to/server.pem".to_string()),
226 client_cert_path: None,
227 client_key_path: None,
228 }),
229 },
230 max_batch_bytes: ReadableSize::mb(1),
231 consumer_wait_timeout: Duration::from_millis(100),
232 kafka_topic: KafkaTopicConfig {
233 num_topics: 32,
234 selector_type: TopicSelectorType::RoundRobin,
235 topic_name_prefix: "greptimedb_wal_topic".to_string(),
236 num_partitions: 1,
237 replication_factor: 1,
238 create_topic_timeout: Duration::from_secs(30),
239 },
240 ..Default::default()
241 };
242 assert_eq!(datanode_wal_config, DatanodeWalConfig::Kafka(expected));
243 }
244}