common_wal/
config.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod kafka;
16pub mod raft_engine;
17
18use std::time::Duration;
19
20use serde::{Deserialize, Serialize};
21
22use crate::config::kafka::{DatanodeKafkaConfig, MetasrvKafkaConfig};
23use crate::config::raft_engine::RaftEngineConfig;
24
25/// Wal configurations for metasrv.
26#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Default)]
27#[serde(tag = "provider", rename_all = "snake_case")]
28#[allow(clippy::large_enum_variant)]
29pub enum MetasrvWalConfig {
30    #[default]
31    RaftEngine,
32    Kafka(MetasrvKafkaConfig),
33}
34
35#[allow(clippy::large_enum_variant)]
36/// Wal configurations for datanode.
37#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
38#[serde(tag = "provider", rename_all = "snake_case")]
39pub enum DatanodeWalConfig {
40    RaftEngine(RaftEngineConfig),
41    Kafka(DatanodeKafkaConfig),
42}
43
44impl Default for DatanodeWalConfig {
45    fn default() -> Self {
46        Self::RaftEngine(RaftEngineConfig::default())
47    }
48}
49
50impl From<DatanodeWalConfig> for MetasrvWalConfig {
51    fn from(config: DatanodeWalConfig) -> Self {
52        match config {
53            DatanodeWalConfig::RaftEngine(_) => Self::RaftEngine,
54            DatanodeWalConfig::Kafka(config) => Self::Kafka(MetasrvKafkaConfig {
55                connection: config.connection,
56                kafka_topic: config.kafka_topic,
57                auto_create_topics: config.auto_create_topics,
58                auto_prune_interval: config.auto_prune_interval,
59                trigger_flush_threshold: config.trigger_flush_threshold,
60                auto_prune_parallelism: config.auto_prune_parallelism,
61            }),
62        }
63    }
64}
65
66impl MetasrvWalConfig {
67    /// Returns if active wal pruning is enabled.
68    pub fn enable_active_wal_pruning(&self) -> bool {
69        match self {
70            MetasrvWalConfig::RaftEngine => false,
71            MetasrvWalConfig::Kafka(config) => config.auto_prune_interval > Duration::ZERO,
72        }
73    }
74
75    /// Gets the kafka connection config.
76    pub fn remote_wal_options(&self) -> Option<&MetasrvKafkaConfig> {
77        match self {
78            MetasrvWalConfig::RaftEngine => None,
79            MetasrvWalConfig::Kafka(config) => Some(config),
80        }
81    }
82}
83
84impl From<MetasrvWalConfig> for DatanodeWalConfig {
85    fn from(config: MetasrvWalConfig) -> Self {
86        match config {
87            MetasrvWalConfig::RaftEngine => Self::RaftEngine(RaftEngineConfig::default()),
88            MetasrvWalConfig::Kafka(config) => Self::Kafka(DatanodeKafkaConfig {
89                connection: config.connection,
90                kafka_topic: config.kafka_topic,
91                ..Default::default()
92            }),
93        }
94    }
95}
96
97#[cfg(test)]
98mod tests {
99    use std::time::Duration;
100
101    use common_base::readable_size::ReadableSize;
102    use kafka::common::{
103        KafkaClientSasl, KafkaClientSaslConfig, KafkaClientTls, KafkaConnectionConfig,
104    };
105    use tests::kafka::common::KafkaTopicConfig;
106
107    use super::*;
108    use crate::config::{DatanodeKafkaConfig, MetasrvKafkaConfig};
109    use crate::TopicSelectorType;
110
111    #[test]
112    fn test_toml_raft_engine() {
113        // With none configs.
114        let toml_str = r#"
115            provider = "raft_engine"
116        "#;
117        let metasrv_wal_config: MetasrvWalConfig = toml::from_str(toml_str).unwrap();
118        assert_eq!(metasrv_wal_config, MetasrvWalConfig::RaftEngine);
119
120        let datanode_wal_config: DatanodeWalConfig = toml::from_str(toml_str).unwrap();
121        assert_eq!(
122            datanode_wal_config,
123            DatanodeWalConfig::RaftEngine(RaftEngineConfig::default())
124        );
125
126        // With useless configs.
127        let toml_str = r#"
128            provider = "raft_engine"
129            broker_endpoints = ["127.0.0.1:9092"]
130            num_topics = 32
131        "#;
132        let datanode_wal_config: DatanodeWalConfig = toml::from_str(toml_str).unwrap();
133        assert_eq!(
134            datanode_wal_config,
135            DatanodeWalConfig::RaftEngine(RaftEngineConfig::default())
136        );
137
138        // With some useful configs.
139        let toml_str = r#"
140            provider = "raft_engine"
141            file_size = "4MB"
142            purge_threshold = "1GB"
143            purge_interval = "5mins"
144        "#;
145        let datanode_wal_config: DatanodeWalConfig = toml::from_str(toml_str).unwrap();
146        let expected = RaftEngineConfig {
147            file_size: ReadableSize::mb(4),
148            purge_threshold: ReadableSize::gb(1),
149            purge_interval: Duration::from_secs(5 * 60),
150            ..Default::default()
151        };
152        assert_eq!(datanode_wal_config, DatanodeWalConfig::RaftEngine(expected));
153    }
154
155    #[test]
156    fn test_toml_kafka() {
157        let toml_str = r#"
158            provider = "kafka"
159            broker_endpoints = ["127.0.0.1:9092"]
160            max_batch_bytes = "1MB"
161            consumer_wait_timeout = "100ms"
162            num_topics = 32
163            num_partitions = 1
164            selector_type = "round_robin"
165            replication_factor = 1
166            create_topic_timeout = "30s"
167            topic_name_prefix = "greptimedb_wal_topic"
168            [tls]
169            server_ca_cert_path = "/path/to/server.pem"
170            [sasl]
171            type = "SCRAM-SHA-512"
172            username = "hi"
173            password = "test"
174        "#;
175
176        // Deserialized to MetasrvWalConfig.
177        let metasrv_wal_config: MetasrvWalConfig = toml::from_str(toml_str).unwrap();
178        let expected = MetasrvKafkaConfig {
179            connection: KafkaConnectionConfig {
180                broker_endpoints: vec!["127.0.0.1:9092".to_string()],
181                sasl: Some(KafkaClientSasl {
182                    config: KafkaClientSaslConfig::ScramSha512 {
183                        username: "hi".to_string(),
184                        password: "test".to_string(),
185                    },
186                }),
187                tls: Some(KafkaClientTls {
188                    server_ca_cert_path: Some("/path/to/server.pem".to_string()),
189                    client_cert_path: None,
190                    client_key_path: None,
191                }),
192            },
193            kafka_topic: KafkaTopicConfig {
194                num_topics: 32,
195                selector_type: TopicSelectorType::RoundRobin,
196                topic_name_prefix: "greptimedb_wal_topic".to_string(),
197                num_partitions: 1,
198                replication_factor: 1,
199                create_topic_timeout: Duration::from_secs(30),
200            },
201            auto_create_topics: true,
202            auto_prune_interval: Duration::from_secs(0),
203            trigger_flush_threshold: 0,
204            auto_prune_parallelism: 10,
205        };
206        assert_eq!(metasrv_wal_config, MetasrvWalConfig::Kafka(expected));
207
208        // Deserialized to DatanodeWalConfig.
209        let datanode_wal_config: DatanodeWalConfig = toml::from_str(toml_str).unwrap();
210        let expected = DatanodeKafkaConfig {
211            connection: KafkaConnectionConfig {
212                broker_endpoints: vec!["127.0.0.1:9092".to_string()],
213                sasl: Some(KafkaClientSasl {
214                    config: KafkaClientSaslConfig::ScramSha512 {
215                        username: "hi".to_string(),
216                        password: "test".to_string(),
217                    },
218                }),
219                tls: Some(KafkaClientTls {
220                    server_ca_cert_path: Some("/path/to/server.pem".to_string()),
221                    client_cert_path: None,
222                    client_key_path: None,
223                }),
224            },
225            max_batch_bytes: ReadableSize::mb(1),
226            consumer_wait_timeout: Duration::from_millis(100),
227            kafka_topic: KafkaTopicConfig {
228                num_topics: 32,
229                selector_type: TopicSelectorType::RoundRobin,
230                topic_name_prefix: "greptimedb_wal_topic".to_string(),
231                num_partitions: 1,
232                replication_factor: 1,
233                create_topic_timeout: Duration::from_secs(30),
234            },
235            ..Default::default()
236        };
237        assert_eq!(datanode_wal_config, DatanodeWalConfig::Kafka(expected));
238    }
239}