common_wal/
config.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod kafka;
16pub mod raft_engine;
17
18use std::time::Duration;
19
20use serde::{Deserialize, Serialize};
21
22use crate::config::kafka::common::{
23    DEFAULT_AUTO_PRUNE_INTERVAL, DEFAULT_AUTO_PRUNE_PARALLELISM, DEFAULT_CHECKPOINT_TRIGGER_SIZE,
24    DEFAULT_FLUSH_TRIGGER_SIZE,
25};
26use crate::config::kafka::{DatanodeKafkaConfig, MetasrvKafkaConfig};
27use crate::config::raft_engine::RaftEngineConfig;
28use crate::error::{Error, UnsupportedWalProviderSnafu};
29
30/// Wal configurations for metasrv.
31#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Default)]
32#[serde(tag = "provider", rename_all = "snake_case")]
33#[allow(clippy::large_enum_variant)]
34pub enum MetasrvWalConfig {
35    #[default]
36    RaftEngine,
37    Kafka(MetasrvKafkaConfig),
38}
39
40#[allow(clippy::large_enum_variant)]
41/// Wal configurations for datanode.
42#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
43#[serde(tag = "provider", rename_all = "snake_case")]
44pub enum DatanodeWalConfig {
45    RaftEngine(RaftEngineConfig),
46    Kafka(DatanodeKafkaConfig),
47    Noop,
48}
49
50impl Default for DatanodeWalConfig {
51    fn default() -> Self {
52        Self::RaftEngine(RaftEngineConfig::default())
53    }
54}
55
56impl TryFrom<DatanodeWalConfig> for MetasrvWalConfig {
57    type Error = Error;
58
59    fn try_from(config: DatanodeWalConfig) -> Result<Self, Self::Error> {
60        match config {
61            DatanodeWalConfig::RaftEngine(_) => Ok(Self::RaftEngine),
62            DatanodeWalConfig::Kafka(config) => Ok(Self::Kafka(MetasrvKafkaConfig {
63                connection: config.connection,
64                kafka_topic: config.kafka_topic,
65                auto_create_topics: config.auto_create_topics,
66                // This field won't be used in standalone mode
67                auto_prune_interval: DEFAULT_AUTO_PRUNE_INTERVAL,
68                // This field won't be used in standalone mode
69                auto_prune_parallelism: DEFAULT_AUTO_PRUNE_PARALLELISM,
70                // This field won't be used in standalone mode
71                flush_trigger_size: DEFAULT_FLUSH_TRIGGER_SIZE,
72                // This field won't be used in standalone mode
73                checkpoint_trigger_size: DEFAULT_CHECKPOINT_TRIGGER_SIZE,
74            })),
75            DatanodeWalConfig::Noop => UnsupportedWalProviderSnafu {
76                provider: "noop".to_string(),
77            }
78            .fail(),
79        }
80    }
81}
82
83impl MetasrvWalConfig {
84    /// Returns if active wal pruning is enabled.
85    pub fn enable_active_wal_pruning(&self) -> bool {
86        match self {
87            MetasrvWalConfig::RaftEngine => false,
88            MetasrvWalConfig::Kafka(config) => config.auto_prune_interval > Duration::ZERO,
89        }
90    }
91
92    /// Gets the kafka connection config.
93    pub fn remote_wal_options(&self) -> Option<&MetasrvKafkaConfig> {
94        match self {
95            MetasrvWalConfig::RaftEngine => None,
96            MetasrvWalConfig::Kafka(config) => Some(config),
97        }
98    }
99}
100
101impl From<MetasrvWalConfig> for DatanodeWalConfig {
102    fn from(config: MetasrvWalConfig) -> Self {
103        match config {
104            MetasrvWalConfig::RaftEngine => Self::RaftEngine(RaftEngineConfig::default()),
105            MetasrvWalConfig::Kafka(config) => Self::Kafka(DatanodeKafkaConfig {
106                connection: config.connection,
107                kafka_topic: config.kafka_topic,
108                ..Default::default()
109            }),
110        }
111    }
112}
113
114#[cfg(test)]
115mod tests {
116    use std::time::Duration;
117
118    use common_base::readable_size::ReadableSize;
119    use kafka::common::{
120        KafkaClientSasl, KafkaClientSaslConfig, KafkaClientTls, KafkaConnectionConfig,
121    };
122    use tests::kafka::common::KafkaTopicConfig;
123
124    use super::*;
125    use crate::TopicSelectorType;
126    use crate::config::{DatanodeKafkaConfig, MetasrvKafkaConfig};
127
128    #[test]
129    fn test_toml_raft_engine() {
130        // With none configs.
131        let toml_str = r#"
132            provider = "raft_engine"
133        "#;
134        let metasrv_wal_config: MetasrvWalConfig = toml::from_str(toml_str).unwrap();
135        assert_eq!(metasrv_wal_config, MetasrvWalConfig::RaftEngine);
136
137        let datanode_wal_config: DatanodeWalConfig = toml::from_str(toml_str).unwrap();
138        assert_eq!(
139            datanode_wal_config,
140            DatanodeWalConfig::RaftEngine(RaftEngineConfig::default())
141        );
142
143        // With useless configs.
144        let toml_str = r#"
145            provider = "raft_engine"
146            broker_endpoints = ["127.0.0.1:9092"]
147            num_topics = 32
148        "#;
149        let datanode_wal_config: DatanodeWalConfig = toml::from_str(toml_str).unwrap();
150        assert_eq!(
151            datanode_wal_config,
152            DatanodeWalConfig::RaftEngine(RaftEngineConfig::default())
153        );
154
155        // With some useful configs.
156        let toml_str = r#"
157            provider = "raft_engine"
158            file_size = "4MB"
159            purge_threshold = "1GB"
160            purge_interval = "5mins"
161        "#;
162        let datanode_wal_config: DatanodeWalConfig = toml::from_str(toml_str).unwrap();
163        let expected = RaftEngineConfig {
164            file_size: ReadableSize::mb(4),
165            purge_threshold: ReadableSize::gb(1),
166            purge_interval: Duration::from_secs(5 * 60),
167            ..Default::default()
168        };
169        assert_eq!(datanode_wal_config, DatanodeWalConfig::RaftEngine(expected));
170    }
171
172    #[test]
173    fn test_toml_kafka() {
174        let toml_str = r#"
175            provider = "kafka"
176            broker_endpoints = ["127.0.0.1:9092"]
177            max_batch_bytes = "1MB"
178            consumer_wait_timeout = "100ms"
179            num_topics = 32
180            num_partitions = 1
181            selector_type = "round_robin"
182            replication_factor = 1
183            create_topic_timeout = "30s"
184            topic_name_prefix = "greptimedb_wal_topic"
185            [tls]
186            server_ca_cert_path = "/path/to/server.pem"
187            [sasl]
188            type = "SCRAM-SHA-512"
189            username = "hi"
190            password = "test"
191        "#;
192
193        // Deserialized to MetasrvWalConfig.
194        let metasrv_wal_config: MetasrvWalConfig = toml::from_str(toml_str).unwrap();
195        let expected = MetasrvKafkaConfig {
196            connection: KafkaConnectionConfig {
197                broker_endpoints: vec!["127.0.0.1:9092".to_string()],
198                sasl: Some(KafkaClientSasl {
199                    config: KafkaClientSaslConfig::ScramSha512 {
200                        username: "hi".to_string(),
201                        password: "test".to_string(),
202                    },
203                }),
204                tls: Some(KafkaClientTls {
205                    server_ca_cert_path: Some("/path/to/server.pem".to_string()),
206                    client_cert_path: None,
207                    client_key_path: None,
208                }),
209            },
210            kafka_topic: KafkaTopicConfig {
211                num_topics: 32,
212                selector_type: TopicSelectorType::RoundRobin,
213                topic_name_prefix: "greptimedb_wal_topic".to_string(),
214                num_partitions: 1,
215                replication_factor: 1,
216                create_topic_timeout: Duration::from_secs(30),
217            },
218            auto_create_topics: true,
219            auto_prune_interval: Duration::from_mins(30),
220            auto_prune_parallelism: 10,
221            flush_trigger_size: ReadableSize::mb(512),
222            checkpoint_trigger_size: ReadableSize::mb(128),
223        };
224        assert_eq!(metasrv_wal_config, MetasrvWalConfig::Kafka(expected));
225
226        // Deserialized to DatanodeWalConfig.
227        let datanode_wal_config: DatanodeWalConfig = toml::from_str(toml_str).unwrap();
228        let expected = DatanodeKafkaConfig {
229            connection: KafkaConnectionConfig {
230                broker_endpoints: vec!["127.0.0.1:9092".to_string()],
231                sasl: Some(KafkaClientSasl {
232                    config: KafkaClientSaslConfig::ScramSha512 {
233                        username: "hi".to_string(),
234                        password: "test".to_string(),
235                    },
236                }),
237                tls: Some(KafkaClientTls {
238                    server_ca_cert_path: Some("/path/to/server.pem".to_string()),
239                    client_cert_path: None,
240                    client_key_path: None,
241                }),
242            },
243            max_batch_bytes: ReadableSize::mb(1),
244            consumer_wait_timeout: Duration::from_millis(100),
245            kafka_topic: KafkaTopicConfig {
246                num_topics: 32,
247                selector_type: TopicSelectorType::RoundRobin,
248                topic_name_prefix: "greptimedb_wal_topic".to_string(),
249                num_partitions: 1,
250                replication_factor: 1,
251                create_topic_timeout: Duration::from_secs(30),
252            },
253            ..Default::default()
254        };
255        assert_eq!(datanode_wal_config, DatanodeWalConfig::Kafka(expected));
256    }
257}