1pub mod kafka;
16pub mod raft_engine;
17
18use std::time::Duration;
19
20use serde::{Deserialize, Serialize};
21
22use crate::config::kafka::{DatanodeKafkaConfig, MetasrvKafkaConfig};
23use crate::config::raft_engine::RaftEngineConfig;
24
25#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Default)]
27#[serde(tag = "provider", rename_all = "snake_case")]
28#[allow(clippy::large_enum_variant)]
29pub enum MetasrvWalConfig {
30 #[default]
31 RaftEngine,
32 Kafka(MetasrvKafkaConfig),
33}
34
35#[allow(clippy::large_enum_variant)]
36#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
38#[serde(tag = "provider", rename_all = "snake_case")]
39pub enum DatanodeWalConfig {
40 RaftEngine(RaftEngineConfig),
41 Kafka(DatanodeKafkaConfig),
42}
43
44impl Default for DatanodeWalConfig {
45 fn default() -> Self {
46 Self::RaftEngine(RaftEngineConfig::default())
47 }
48}
49
50impl From<DatanodeWalConfig> for MetasrvWalConfig {
51 fn from(config: DatanodeWalConfig) -> Self {
52 match config {
53 DatanodeWalConfig::RaftEngine(_) => Self::RaftEngine,
54 DatanodeWalConfig::Kafka(config) => Self::Kafka(MetasrvKafkaConfig {
55 connection: config.connection,
56 kafka_topic: config.kafka_topic,
57 auto_create_topics: config.auto_create_topics,
58 auto_prune_interval: config.auto_prune_interval,
59 trigger_flush_threshold: config.trigger_flush_threshold,
60 auto_prune_parallelism: config.auto_prune_parallelism,
61 }),
62 }
63 }
64}
65
66impl MetasrvWalConfig {
67 pub fn enable_active_wal_pruning(&self) -> bool {
69 match self {
70 MetasrvWalConfig::RaftEngine => false,
71 MetasrvWalConfig::Kafka(config) => config.auto_prune_interval > Duration::ZERO,
72 }
73 }
74
75 pub fn remote_wal_options(&self) -> Option<&MetasrvKafkaConfig> {
77 match self {
78 MetasrvWalConfig::RaftEngine => None,
79 MetasrvWalConfig::Kafka(config) => Some(config),
80 }
81 }
82}
83
84impl From<MetasrvWalConfig> for DatanodeWalConfig {
85 fn from(config: MetasrvWalConfig) -> Self {
86 match config {
87 MetasrvWalConfig::RaftEngine => Self::RaftEngine(RaftEngineConfig::default()),
88 MetasrvWalConfig::Kafka(config) => Self::Kafka(DatanodeKafkaConfig {
89 connection: config.connection,
90 kafka_topic: config.kafka_topic,
91 ..Default::default()
92 }),
93 }
94 }
95}
96
97#[cfg(test)]
98mod tests {
99 use std::time::Duration;
100
101 use common_base::readable_size::ReadableSize;
102 use kafka::common::{
103 KafkaClientSasl, KafkaClientSaslConfig, KafkaClientTls, KafkaConnectionConfig,
104 };
105 use tests::kafka::common::KafkaTopicConfig;
106
107 use super::*;
108 use crate::config::{DatanodeKafkaConfig, MetasrvKafkaConfig};
109 use crate::TopicSelectorType;
110
111 #[test]
112 fn test_toml_raft_engine() {
113 let toml_str = r#"
115 provider = "raft_engine"
116 "#;
117 let metasrv_wal_config: MetasrvWalConfig = toml::from_str(toml_str).unwrap();
118 assert_eq!(metasrv_wal_config, MetasrvWalConfig::RaftEngine);
119
120 let datanode_wal_config: DatanodeWalConfig = toml::from_str(toml_str).unwrap();
121 assert_eq!(
122 datanode_wal_config,
123 DatanodeWalConfig::RaftEngine(RaftEngineConfig::default())
124 );
125
126 let toml_str = r#"
128 provider = "raft_engine"
129 broker_endpoints = ["127.0.0.1:9092"]
130 num_topics = 32
131 "#;
132 let datanode_wal_config: DatanodeWalConfig = toml::from_str(toml_str).unwrap();
133 assert_eq!(
134 datanode_wal_config,
135 DatanodeWalConfig::RaftEngine(RaftEngineConfig::default())
136 );
137
138 let toml_str = r#"
140 provider = "raft_engine"
141 file_size = "4MB"
142 purge_threshold = "1GB"
143 purge_interval = "5mins"
144 "#;
145 let datanode_wal_config: DatanodeWalConfig = toml::from_str(toml_str).unwrap();
146 let expected = RaftEngineConfig {
147 file_size: ReadableSize::mb(4),
148 purge_threshold: ReadableSize::gb(1),
149 purge_interval: Duration::from_secs(5 * 60),
150 ..Default::default()
151 };
152 assert_eq!(datanode_wal_config, DatanodeWalConfig::RaftEngine(expected));
153 }
154
155 #[test]
156 fn test_toml_kafka() {
157 let toml_str = r#"
158 provider = "kafka"
159 broker_endpoints = ["127.0.0.1:9092"]
160 max_batch_bytes = "1MB"
161 consumer_wait_timeout = "100ms"
162 num_topics = 32
163 num_partitions = 1
164 selector_type = "round_robin"
165 replication_factor = 1
166 create_topic_timeout = "30s"
167 topic_name_prefix = "greptimedb_wal_topic"
168 [tls]
169 server_ca_cert_path = "/path/to/server.pem"
170 [sasl]
171 type = "SCRAM-SHA-512"
172 username = "hi"
173 password = "test"
174 "#;
175
176 let metasrv_wal_config: MetasrvWalConfig = toml::from_str(toml_str).unwrap();
178 let expected = MetasrvKafkaConfig {
179 connection: KafkaConnectionConfig {
180 broker_endpoints: vec!["127.0.0.1:9092".to_string()],
181 sasl: Some(KafkaClientSasl {
182 config: KafkaClientSaslConfig::ScramSha512 {
183 username: "hi".to_string(),
184 password: "test".to_string(),
185 },
186 }),
187 tls: Some(KafkaClientTls {
188 server_ca_cert_path: Some("/path/to/server.pem".to_string()),
189 client_cert_path: None,
190 client_key_path: None,
191 }),
192 },
193 kafka_topic: KafkaTopicConfig {
194 num_topics: 32,
195 selector_type: TopicSelectorType::RoundRobin,
196 topic_name_prefix: "greptimedb_wal_topic".to_string(),
197 num_partitions: 1,
198 replication_factor: 1,
199 create_topic_timeout: Duration::from_secs(30),
200 },
201 auto_create_topics: true,
202 auto_prune_interval: Duration::from_secs(0),
203 trigger_flush_threshold: 0,
204 auto_prune_parallelism: 10,
205 };
206 assert_eq!(metasrv_wal_config, MetasrvWalConfig::Kafka(expected));
207
208 let datanode_wal_config: DatanodeWalConfig = toml::from_str(toml_str).unwrap();
210 let expected = DatanodeKafkaConfig {
211 connection: KafkaConnectionConfig {
212 broker_endpoints: vec!["127.0.0.1:9092".to_string()],
213 sasl: Some(KafkaClientSasl {
214 config: KafkaClientSaslConfig::ScramSha512 {
215 username: "hi".to_string(),
216 password: "test".to_string(),
217 },
218 }),
219 tls: Some(KafkaClientTls {
220 server_ca_cert_path: Some("/path/to/server.pem".to_string()),
221 client_cert_path: None,
222 client_key_path: None,
223 }),
224 },
225 max_batch_bytes: ReadableSize::mb(1),
226 consumer_wait_timeout: Duration::from_millis(100),
227 kafka_topic: KafkaTopicConfig {
228 num_topics: 32,
229 selector_type: TopicSelectorType::RoundRobin,
230 topic_name_prefix: "greptimedb_wal_topic".to_string(),
231 num_partitions: 1,
232 replication_factor: 1,
233 create_topic_timeout: Duration::from_secs(30),
234 },
235 ..Default::default()
236 };
237 assert_eq!(datanode_wal_config, DatanodeWalConfig::Kafka(expected));
238 }
239}