common_wal/
config.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod kafka;
16pub mod raft_engine;
17
18use std::time::Duration;
19
20use serde::{Deserialize, Serialize};
21
22use crate::config::kafka::common::{
23    DEFAULT_AUTO_PRUNE_INTERVAL, DEFAULT_AUTO_PRUNE_PARALLELISM, DEFAULT_CHECKPOINT_TRIGGER_SIZE,
24    DEFAULT_FLUSH_TRIGGER_SIZE,
25};
26use crate::config::kafka::{DatanodeKafkaConfig, MetasrvKafkaConfig};
27use crate::config::raft_engine::RaftEngineConfig;
28
29/// Wal configurations for metasrv.
30#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Default)]
31#[serde(tag = "provider", rename_all = "snake_case")]
32#[allow(clippy::large_enum_variant)]
33pub enum MetasrvWalConfig {
34    #[default]
35    RaftEngine,
36    Kafka(MetasrvKafkaConfig),
37}
38
39#[allow(clippy::large_enum_variant)]
40/// Wal configurations for datanode.
41#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
42#[serde(tag = "provider", rename_all = "snake_case")]
43pub enum DatanodeWalConfig {
44    RaftEngine(RaftEngineConfig),
45    Kafka(DatanodeKafkaConfig),
46}
47
48impl Default for DatanodeWalConfig {
49    fn default() -> Self {
50        Self::RaftEngine(RaftEngineConfig::default())
51    }
52}
53
54impl From<DatanodeWalConfig> for MetasrvWalConfig {
55    fn from(config: DatanodeWalConfig) -> Self {
56        match config {
57            DatanodeWalConfig::RaftEngine(_) => Self::RaftEngine,
58            DatanodeWalConfig::Kafka(config) => Self::Kafka(MetasrvKafkaConfig {
59                connection: config.connection,
60                kafka_topic: config.kafka_topic,
61                auto_create_topics: config.auto_create_topics,
62                // This field won't be used in standalone mode
63                auto_prune_interval: DEFAULT_AUTO_PRUNE_INTERVAL,
64                // This field won't be used in standalone mode
65                auto_prune_parallelism: DEFAULT_AUTO_PRUNE_PARALLELISM,
66                // This field won't be used in standalone mode
67                flush_trigger_size: DEFAULT_FLUSH_TRIGGER_SIZE,
68                // This field won't be used in standalone mode
69                checkpoint_trigger_size: DEFAULT_CHECKPOINT_TRIGGER_SIZE,
70            }),
71        }
72    }
73}
74
75impl MetasrvWalConfig {
76    /// Returns if active wal pruning is enabled.
77    pub fn enable_active_wal_pruning(&self) -> bool {
78        match self {
79            MetasrvWalConfig::RaftEngine => false,
80            MetasrvWalConfig::Kafka(config) => config.auto_prune_interval > Duration::ZERO,
81        }
82    }
83
84    /// Gets the kafka connection config.
85    pub fn remote_wal_options(&self) -> Option<&MetasrvKafkaConfig> {
86        match self {
87            MetasrvWalConfig::RaftEngine => None,
88            MetasrvWalConfig::Kafka(config) => Some(config),
89        }
90    }
91}
92
93impl From<MetasrvWalConfig> for DatanodeWalConfig {
94    fn from(config: MetasrvWalConfig) -> Self {
95        match config {
96            MetasrvWalConfig::RaftEngine => Self::RaftEngine(RaftEngineConfig::default()),
97            MetasrvWalConfig::Kafka(config) => Self::Kafka(DatanodeKafkaConfig {
98                connection: config.connection,
99                kafka_topic: config.kafka_topic,
100                ..Default::default()
101            }),
102        }
103    }
104}
105
106#[cfg(test)]
107mod tests {
108    use std::time::Duration;
109
110    use common_base::readable_size::ReadableSize;
111    use kafka::common::{
112        KafkaClientSasl, KafkaClientSaslConfig, KafkaClientTls, KafkaConnectionConfig,
113    };
114    use tests::kafka::common::KafkaTopicConfig;
115
116    use super::*;
117    use crate::config::{DatanodeKafkaConfig, MetasrvKafkaConfig};
118    use crate::TopicSelectorType;
119
120    #[test]
121    fn test_toml_raft_engine() {
122        // With none configs.
123        let toml_str = r#"
124            provider = "raft_engine"
125        "#;
126        let metasrv_wal_config: MetasrvWalConfig = toml::from_str(toml_str).unwrap();
127        assert_eq!(metasrv_wal_config, MetasrvWalConfig::RaftEngine);
128
129        let datanode_wal_config: DatanodeWalConfig = toml::from_str(toml_str).unwrap();
130        assert_eq!(
131            datanode_wal_config,
132            DatanodeWalConfig::RaftEngine(RaftEngineConfig::default())
133        );
134
135        // With useless configs.
136        let toml_str = r#"
137            provider = "raft_engine"
138            broker_endpoints = ["127.0.0.1:9092"]
139            num_topics = 32
140        "#;
141        let datanode_wal_config: DatanodeWalConfig = toml::from_str(toml_str).unwrap();
142        assert_eq!(
143            datanode_wal_config,
144            DatanodeWalConfig::RaftEngine(RaftEngineConfig::default())
145        );
146
147        // With some useful configs.
148        let toml_str = r#"
149            provider = "raft_engine"
150            file_size = "4MB"
151            purge_threshold = "1GB"
152            purge_interval = "5mins"
153        "#;
154        let datanode_wal_config: DatanodeWalConfig = toml::from_str(toml_str).unwrap();
155        let expected = RaftEngineConfig {
156            file_size: ReadableSize::mb(4),
157            purge_threshold: ReadableSize::gb(1),
158            purge_interval: Duration::from_secs(5 * 60),
159            ..Default::default()
160        };
161        assert_eq!(datanode_wal_config, DatanodeWalConfig::RaftEngine(expected));
162    }
163
164    #[test]
165    fn test_toml_kafka() {
166        let toml_str = r#"
167            provider = "kafka"
168            broker_endpoints = ["127.0.0.1:9092"]
169            max_batch_bytes = "1MB"
170            consumer_wait_timeout = "100ms"
171            num_topics = 32
172            num_partitions = 1
173            selector_type = "round_robin"
174            replication_factor = 1
175            create_topic_timeout = "30s"
176            topic_name_prefix = "greptimedb_wal_topic"
177            [tls]
178            server_ca_cert_path = "/path/to/server.pem"
179            [sasl]
180            type = "SCRAM-SHA-512"
181            username = "hi"
182            password = "test"
183        "#;
184
185        // Deserialized to MetasrvWalConfig.
186        let metasrv_wal_config: MetasrvWalConfig = toml::from_str(toml_str).unwrap();
187        let expected = MetasrvKafkaConfig {
188            connection: KafkaConnectionConfig {
189                broker_endpoints: vec!["127.0.0.1:9092".to_string()],
190                sasl: Some(KafkaClientSasl {
191                    config: KafkaClientSaslConfig::ScramSha512 {
192                        username: "hi".to_string(),
193                        password: "test".to_string(),
194                    },
195                }),
196                tls: Some(KafkaClientTls {
197                    server_ca_cert_path: Some("/path/to/server.pem".to_string()),
198                    client_cert_path: None,
199                    client_key_path: None,
200                }),
201            },
202            kafka_topic: KafkaTopicConfig {
203                num_topics: 32,
204                selector_type: TopicSelectorType::RoundRobin,
205                topic_name_prefix: "greptimedb_wal_topic".to_string(),
206                num_partitions: 1,
207                replication_factor: 1,
208                create_topic_timeout: Duration::from_secs(30),
209            },
210            auto_create_topics: true,
211            auto_prune_interval: Duration::from_mins(30),
212            auto_prune_parallelism: 10,
213            flush_trigger_size: ReadableSize::mb(512),
214            checkpoint_trigger_size: ReadableSize::mb(128),
215        };
216        assert_eq!(metasrv_wal_config, MetasrvWalConfig::Kafka(expected));
217
218        // Deserialized to DatanodeWalConfig.
219        let datanode_wal_config: DatanodeWalConfig = toml::from_str(toml_str).unwrap();
220        let expected = DatanodeKafkaConfig {
221            connection: KafkaConnectionConfig {
222                broker_endpoints: vec!["127.0.0.1:9092".to_string()],
223                sasl: Some(KafkaClientSasl {
224                    config: KafkaClientSaslConfig::ScramSha512 {
225                        username: "hi".to_string(),
226                        password: "test".to_string(),
227                    },
228                }),
229                tls: Some(KafkaClientTls {
230                    server_ca_cert_path: Some("/path/to/server.pem".to_string()),
231                    client_cert_path: None,
232                    client_key_path: None,
233                }),
234            },
235            max_batch_bytes: ReadableSize::mb(1),
236            consumer_wait_timeout: Duration::from_millis(100),
237            kafka_topic: KafkaTopicConfig {
238                num_topics: 32,
239                selector_type: TopicSelectorType::RoundRobin,
240                topic_name_prefix: "greptimedb_wal_topic".to_string(),
241                num_partitions: 1,
242                replication_factor: 1,
243                create_topic_timeout: Duration::from_secs(30),
244            },
245            ..Default::default()
246        };
247        assert_eq!(datanode_wal_config, DatanodeWalConfig::Kafka(expected));
248    }
249}