common_wal/
config.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod kafka;
16pub mod raft_engine;
17
18use std::time::Duration;
19
20use serde::{Deserialize, Serialize};
21
22use crate::config::kafka::{DatanodeKafkaConfig, MetasrvKafkaConfig};
23use crate::config::raft_engine::RaftEngineConfig;
24
25/// Wal configurations for metasrv.
26#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Default)]
27#[serde(tag = "provider", rename_all = "snake_case")]
28#[allow(clippy::large_enum_variant)]
29pub enum MetasrvWalConfig {
30    #[default]
31    RaftEngine,
32    Kafka(MetasrvKafkaConfig),
33}
34
35#[allow(clippy::large_enum_variant)]
36/// Wal configurations for datanode.
37#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
38#[serde(tag = "provider", rename_all = "snake_case")]
39pub enum DatanodeWalConfig {
40    RaftEngine(RaftEngineConfig),
41    Kafka(DatanodeKafkaConfig),
42}
43
44impl Default for DatanodeWalConfig {
45    fn default() -> Self {
46        Self::RaftEngine(RaftEngineConfig::default())
47    }
48}
49
50impl From<DatanodeWalConfig> for MetasrvWalConfig {
51    fn from(config: DatanodeWalConfig) -> Self {
52        match config {
53            DatanodeWalConfig::RaftEngine(_) => Self::RaftEngine,
54            DatanodeWalConfig::Kafka(config) => Self::Kafka(MetasrvKafkaConfig {
55                connection: config.connection,
56                kafka_topic: config.kafka_topic,
57                auto_create_topics: config.auto_create_topics,
58                auto_prune_interval: config.auto_prune_interval,
59                trigger_flush_threshold: config.trigger_flush_threshold,
60                auto_prune_parallelism: config.auto_prune_parallelism,
61            }),
62        }
63    }
64}
65
66impl MetasrvWalConfig {
67    /// Returns if active wal pruning is enabled.
68    pub fn enable_active_wal_pruning(&self) -> bool {
69        match self {
70            MetasrvWalConfig::RaftEngine => false,
71            MetasrvWalConfig::Kafka(config) => config.auto_prune_interval > Duration::ZERO,
72        }
73    }
74
75    /// Gets the kafka connection config.
76    pub fn remote_wal_options(&self) -> Option<&MetasrvKafkaConfig> {
77        match self {
78            MetasrvWalConfig::RaftEngine => None,
79            MetasrvWalConfig::Kafka(config) => Some(config),
80        }
81    }
82}
83
84impl From<MetasrvWalConfig> for DatanodeWalConfig {
85    fn from(config: MetasrvWalConfig) -> Self {
86        match config {
87            MetasrvWalConfig::RaftEngine => Self::RaftEngine(RaftEngineConfig::default()),
88            MetasrvWalConfig::Kafka(config) => Self::Kafka(DatanodeKafkaConfig {
89                connection: config.connection,
90                kafka_topic: config.kafka_topic,
91                ..Default::default()
92            }),
93        }
94    }
95}
96
97#[cfg(test)]
98mod tests {
99    use std::time::Duration;
100
101    use common_base::readable_size::ReadableSize;
102    use kafka::common::{
103        KafkaClientSasl, KafkaClientSaslConfig, KafkaClientTls, KafkaConnectionConfig,
104    };
105    use tests::kafka::common::KafkaTopicConfig;
106
107    use super::*;
108    use crate::config::{DatanodeKafkaConfig, MetasrvKafkaConfig};
109    use crate::TopicSelectorType;
110
111    #[test]
112    fn test_toml_raft_engine() {
113        // With none configs.
114        let toml_str = r#"
115            provider = "raft_engine"
116        "#;
117        let metasrv_wal_config: MetasrvWalConfig = toml::from_str(toml_str).unwrap();
118        assert_eq!(metasrv_wal_config, MetasrvWalConfig::RaftEngine);
119
120        let datanode_wal_config: DatanodeWalConfig = toml::from_str(toml_str).unwrap();
121        assert_eq!(
122            datanode_wal_config,
123            DatanodeWalConfig::RaftEngine(RaftEngineConfig::default())
124        );
125
126        // With useless configs.
127        let toml_str = r#"
128            provider = "raft_engine"
129            broker_endpoints = ["127.0.0.1:9092"]
130            num_topics = 32
131        "#;
132        let datanode_wal_config: DatanodeWalConfig = toml::from_str(toml_str).unwrap();
133        assert_eq!(
134            datanode_wal_config,
135            DatanodeWalConfig::RaftEngine(RaftEngineConfig::default())
136        );
137
138        // With some useful configs.
139        let toml_str = r#"
140            provider = "raft_engine"
141            file_size = "4MB"
142            purge_threshold = "1GB"
143            purge_interval = "5mins"
144        "#;
145        let datanode_wal_config: DatanodeWalConfig = toml::from_str(toml_str).unwrap();
146        let expected = RaftEngineConfig {
147            file_size: ReadableSize::mb(4),
148            purge_threshold: ReadableSize::gb(1),
149            purge_interval: Duration::from_secs(5 * 60),
150            ..Default::default()
151        };
152        assert_eq!(datanode_wal_config, DatanodeWalConfig::RaftEngine(expected));
153    }
154
155    #[test]
156    fn test_toml_kafka() {
157        let toml_str = r#"
158            provider = "kafka"
159            broker_endpoints = ["127.0.0.1:9092"]
160            max_batch_bytes = "1MB"
161            linger = "200ms"
162            consumer_wait_timeout = "100ms"
163            backoff_init = "500ms"
164            backoff_max = "10s"
165            backoff_base = 2
166            backoff_deadline = "5mins"
167            num_topics = 32
168            num_partitions = 1
169            selector_type = "round_robin"
170            replication_factor = 1
171            create_topic_timeout = "30s"
172            topic_name_prefix = "greptimedb_wal_topic"
173            [tls]
174            server_ca_cert_path = "/path/to/server.pem"
175            [sasl]
176            type = "SCRAM-SHA-512"
177            username = "hi"
178            password = "test"
179        "#;
180
181        // Deserialized to MetasrvWalConfig.
182        let metasrv_wal_config: MetasrvWalConfig = toml::from_str(toml_str).unwrap();
183        let expected = MetasrvKafkaConfig {
184            connection: KafkaConnectionConfig {
185                broker_endpoints: vec!["127.0.0.1:9092".to_string()],
186                sasl: Some(KafkaClientSasl {
187                    config: KafkaClientSaslConfig::ScramSha512 {
188                        username: "hi".to_string(),
189                        password: "test".to_string(),
190                    },
191                }),
192                tls: Some(KafkaClientTls {
193                    server_ca_cert_path: Some("/path/to/server.pem".to_string()),
194                    client_cert_path: None,
195                    client_key_path: None,
196                }),
197            },
198            kafka_topic: KafkaTopicConfig {
199                num_topics: 32,
200                selector_type: TopicSelectorType::RoundRobin,
201                topic_name_prefix: "greptimedb_wal_topic".to_string(),
202                num_partitions: 1,
203                replication_factor: 1,
204                create_topic_timeout: Duration::from_secs(30),
205            },
206            auto_create_topics: true,
207            auto_prune_interval: Duration::from_secs(0),
208            trigger_flush_threshold: 0,
209            auto_prune_parallelism: 10,
210        };
211        assert_eq!(metasrv_wal_config, MetasrvWalConfig::Kafka(expected));
212
213        // Deserialized to DatanodeWalConfig.
214        let datanode_wal_config: DatanodeWalConfig = toml::from_str(toml_str).unwrap();
215        let expected = DatanodeKafkaConfig {
216            connection: KafkaConnectionConfig {
217                broker_endpoints: vec!["127.0.0.1:9092".to_string()],
218                sasl: Some(KafkaClientSasl {
219                    config: KafkaClientSaslConfig::ScramSha512 {
220                        username: "hi".to_string(),
221                        password: "test".to_string(),
222                    },
223                }),
224                tls: Some(KafkaClientTls {
225                    server_ca_cert_path: Some("/path/to/server.pem".to_string()),
226                    client_cert_path: None,
227                    client_key_path: None,
228                }),
229            },
230            max_batch_bytes: ReadableSize::mb(1),
231            consumer_wait_timeout: Duration::from_millis(100),
232            kafka_topic: KafkaTopicConfig {
233                num_topics: 32,
234                selector_type: TopicSelectorType::RoundRobin,
235                topic_name_prefix: "greptimedb_wal_topic".to_string(),
236                num_partitions: 1,
237                replication_factor: 1,
238                create_topic_timeout: Duration::from_secs(30),
239            },
240            ..Default::default()
241        };
242        assert_eq!(datanode_wal_config, DatanodeWalConfig::Kafka(expected));
243    }
244}