1pub mod kafka;
16pub mod raft_engine;
17
18use std::time::Duration;
19
20use serde::{Deserialize, Serialize};
21
22use crate::config::kafka::common::{
23 DEFAULT_AUTO_PRUNE_INTERVAL, DEFAULT_AUTO_PRUNE_PARALLELISM, DEFAULT_CHECKPOINT_TRIGGER_SIZE,
24 DEFAULT_FLUSH_TRIGGER_SIZE,
25};
26use crate::config::kafka::{DatanodeKafkaConfig, MetasrvKafkaConfig};
27use crate::config::raft_engine::RaftEngineConfig;
28
29#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Default)]
31#[serde(tag = "provider", rename_all = "snake_case")]
32#[allow(clippy::large_enum_variant)]
33pub enum MetasrvWalConfig {
34 #[default]
35 RaftEngine,
36 Kafka(MetasrvKafkaConfig),
37}
38
39#[allow(clippy::large_enum_variant)]
40#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
42#[serde(tag = "provider", rename_all = "snake_case")]
43pub enum DatanodeWalConfig {
44 RaftEngine(RaftEngineConfig),
45 Kafka(DatanodeKafkaConfig),
46}
47
48impl Default for DatanodeWalConfig {
49 fn default() -> Self {
50 Self::RaftEngine(RaftEngineConfig::default())
51 }
52}
53
54impl From<DatanodeWalConfig> for MetasrvWalConfig {
55 fn from(config: DatanodeWalConfig) -> Self {
56 match config {
57 DatanodeWalConfig::RaftEngine(_) => Self::RaftEngine,
58 DatanodeWalConfig::Kafka(config) => Self::Kafka(MetasrvKafkaConfig {
59 connection: config.connection,
60 kafka_topic: config.kafka_topic,
61 auto_create_topics: config.auto_create_topics,
62 auto_prune_interval: DEFAULT_AUTO_PRUNE_INTERVAL,
64 auto_prune_parallelism: DEFAULT_AUTO_PRUNE_PARALLELISM,
66 flush_trigger_size: DEFAULT_FLUSH_TRIGGER_SIZE,
68 checkpoint_trigger_size: DEFAULT_CHECKPOINT_TRIGGER_SIZE,
70 }),
71 }
72 }
73}
74
75impl MetasrvWalConfig {
76 pub fn enable_active_wal_pruning(&self) -> bool {
78 match self {
79 MetasrvWalConfig::RaftEngine => false,
80 MetasrvWalConfig::Kafka(config) => config.auto_prune_interval > Duration::ZERO,
81 }
82 }
83
84 pub fn remote_wal_options(&self) -> Option<&MetasrvKafkaConfig> {
86 match self {
87 MetasrvWalConfig::RaftEngine => None,
88 MetasrvWalConfig::Kafka(config) => Some(config),
89 }
90 }
91}
92
93impl From<MetasrvWalConfig> for DatanodeWalConfig {
94 fn from(config: MetasrvWalConfig) -> Self {
95 match config {
96 MetasrvWalConfig::RaftEngine => Self::RaftEngine(RaftEngineConfig::default()),
97 MetasrvWalConfig::Kafka(config) => Self::Kafka(DatanodeKafkaConfig {
98 connection: config.connection,
99 kafka_topic: config.kafka_topic,
100 ..Default::default()
101 }),
102 }
103 }
104}
105
106#[cfg(test)]
107mod tests {
108 use std::time::Duration;
109
110 use common_base::readable_size::ReadableSize;
111 use kafka::common::{
112 KafkaClientSasl, KafkaClientSaslConfig, KafkaClientTls, KafkaConnectionConfig,
113 };
114 use tests::kafka::common::KafkaTopicConfig;
115
116 use super::*;
117 use crate::config::{DatanodeKafkaConfig, MetasrvKafkaConfig};
118 use crate::TopicSelectorType;
119
120 #[test]
121 fn test_toml_raft_engine() {
122 let toml_str = r#"
124 provider = "raft_engine"
125 "#;
126 let metasrv_wal_config: MetasrvWalConfig = toml::from_str(toml_str).unwrap();
127 assert_eq!(metasrv_wal_config, MetasrvWalConfig::RaftEngine);
128
129 let datanode_wal_config: DatanodeWalConfig = toml::from_str(toml_str).unwrap();
130 assert_eq!(
131 datanode_wal_config,
132 DatanodeWalConfig::RaftEngine(RaftEngineConfig::default())
133 );
134
135 let toml_str = r#"
137 provider = "raft_engine"
138 broker_endpoints = ["127.0.0.1:9092"]
139 num_topics = 32
140 "#;
141 let datanode_wal_config: DatanodeWalConfig = toml::from_str(toml_str).unwrap();
142 assert_eq!(
143 datanode_wal_config,
144 DatanodeWalConfig::RaftEngine(RaftEngineConfig::default())
145 );
146
147 let toml_str = r#"
149 provider = "raft_engine"
150 file_size = "4MB"
151 purge_threshold = "1GB"
152 purge_interval = "5mins"
153 "#;
154 let datanode_wal_config: DatanodeWalConfig = toml::from_str(toml_str).unwrap();
155 let expected = RaftEngineConfig {
156 file_size: ReadableSize::mb(4),
157 purge_threshold: ReadableSize::gb(1),
158 purge_interval: Duration::from_secs(5 * 60),
159 ..Default::default()
160 };
161 assert_eq!(datanode_wal_config, DatanodeWalConfig::RaftEngine(expected));
162 }
163
164 #[test]
165 fn test_toml_kafka() {
166 let toml_str = r#"
167 provider = "kafka"
168 broker_endpoints = ["127.0.0.1:9092"]
169 max_batch_bytes = "1MB"
170 consumer_wait_timeout = "100ms"
171 num_topics = 32
172 num_partitions = 1
173 selector_type = "round_robin"
174 replication_factor = 1
175 create_topic_timeout = "30s"
176 topic_name_prefix = "greptimedb_wal_topic"
177 [tls]
178 server_ca_cert_path = "/path/to/server.pem"
179 [sasl]
180 type = "SCRAM-SHA-512"
181 username = "hi"
182 password = "test"
183 "#;
184
185 let metasrv_wal_config: MetasrvWalConfig = toml::from_str(toml_str).unwrap();
187 let expected = MetasrvKafkaConfig {
188 connection: KafkaConnectionConfig {
189 broker_endpoints: vec!["127.0.0.1:9092".to_string()],
190 sasl: Some(KafkaClientSasl {
191 config: KafkaClientSaslConfig::ScramSha512 {
192 username: "hi".to_string(),
193 password: "test".to_string(),
194 },
195 }),
196 tls: Some(KafkaClientTls {
197 server_ca_cert_path: Some("/path/to/server.pem".to_string()),
198 client_cert_path: None,
199 client_key_path: None,
200 }),
201 },
202 kafka_topic: KafkaTopicConfig {
203 num_topics: 32,
204 selector_type: TopicSelectorType::RoundRobin,
205 topic_name_prefix: "greptimedb_wal_topic".to_string(),
206 num_partitions: 1,
207 replication_factor: 1,
208 create_topic_timeout: Duration::from_secs(30),
209 },
210 auto_create_topics: true,
211 auto_prune_interval: Duration::from_mins(30),
212 auto_prune_parallelism: 10,
213 flush_trigger_size: ReadableSize::mb(512),
214 checkpoint_trigger_size: ReadableSize::mb(128),
215 };
216 assert_eq!(metasrv_wal_config, MetasrvWalConfig::Kafka(expected));
217
218 let datanode_wal_config: DatanodeWalConfig = toml::from_str(toml_str).unwrap();
220 let expected = DatanodeKafkaConfig {
221 connection: KafkaConnectionConfig {
222 broker_endpoints: vec!["127.0.0.1:9092".to_string()],
223 sasl: Some(KafkaClientSasl {
224 config: KafkaClientSaslConfig::ScramSha512 {
225 username: "hi".to_string(),
226 password: "test".to_string(),
227 },
228 }),
229 tls: Some(KafkaClientTls {
230 server_ca_cert_path: Some("/path/to/server.pem".to_string()),
231 client_cert_path: None,
232 client_key_path: None,
233 }),
234 },
235 max_batch_bytes: ReadableSize::mb(1),
236 consumer_wait_timeout: Duration::from_millis(100),
237 kafka_topic: KafkaTopicConfig {
238 num_topics: 32,
239 selector_type: TopicSelectorType::RoundRobin,
240 topic_name_prefix: "greptimedb_wal_topic".to_string(),
241 num_partitions: 1,
242 replication_factor: 1,
243 create_topic_timeout: Duration::from_secs(30),
244 },
245 ..Default::default()
246 };
247 assert_eq!(datanode_wal_config, DatanodeWalConfig::Kafka(expected));
248 }
249}