common_wal/config/kafka/
common.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::io::Cursor;
16use std::sync::Arc;
17use std::time::Duration;
18
19use common_base::readable_size::ReadableSize;
20use rskafka::BackoffConfig;
21use rskafka::client::{Credentials, SaslConfig};
22use rustls::{ClientConfig, RootCertStore};
23use serde::{Deserialize, Serialize};
24use snafu::{OptionExt, ResultExt};
25
26/// The default backoff config for kafka client.
27///
28/// If the operation fails, the client will retry 3 times.
29/// The backoff time is 100ms, 300ms, 900ms.
30pub const DEFAULT_BACKOFF_CONFIG: BackoffConfig = BackoffConfig {
31    init_backoff: Duration::from_millis(100),
32    max_backoff: Duration::from_secs(1),
33    base: 3.0,
34    // The deadline shouldn't be too long,
35    // otherwise the client will block the worker loop for a long time.
36    deadline: Some(Duration::from_secs(3)),
37};
38
39/// The default connect timeout for kafka client.
40pub const DEFAULT_CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
41
42/// Default interval for auto WAL pruning.
43pub const DEFAULT_AUTO_PRUNE_INTERVAL: Duration = Duration::from_mins(30);
44/// Default limit for concurrent auto pruning tasks.
45pub const DEFAULT_AUTO_PRUNE_PARALLELISM: usize = 10;
46/// Default size of WAL to trigger flush.
47pub const DEFAULT_FLUSH_TRIGGER_SIZE: ReadableSize = ReadableSize::mb(512);
48/// Default checkpoint trigger size.
49pub const DEFAULT_CHECKPOINT_TRIGGER_SIZE: ReadableSize = ReadableSize::mb(128);
50
51use crate::error::{self, Result};
52use crate::{BROKER_ENDPOINT, TOPIC_NAME_PREFIX, TopicSelectorType};
53
54/// The SASL configurations for kafka client.
55#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
56pub struct KafkaClientSasl {
57    #[serde(flatten)]
58    pub config: KafkaClientSaslConfig,
59}
60
61#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
62#[serde(tag = "type", rename_all = "SCREAMING-KEBAB-CASE")]
63pub enum KafkaClientSaslConfig {
64    Plain {
65        username: String,
66        password: String,
67    },
68    #[serde(rename = "SCRAM-SHA-256")]
69    ScramSha256 {
70        username: String,
71        password: String,
72    },
73    #[serde(rename = "SCRAM-SHA-512")]
74    ScramSha512 {
75        username: String,
76        password: String,
77    },
78}
79
80impl KafkaClientSaslConfig {
81    /// Converts to [`SaslConfig`].
82    pub fn into_sasl_config(self) -> SaslConfig {
83        match self {
84            KafkaClientSaslConfig::Plain { username, password } => {
85                SaslConfig::Plain(Credentials::new(username, password))
86            }
87            KafkaClientSaslConfig::ScramSha256 { username, password } => {
88                SaslConfig::ScramSha256(Credentials::new(username, password))
89            }
90            KafkaClientSaslConfig::ScramSha512 { username, password } => {
91                SaslConfig::ScramSha512(Credentials::new(username, password))
92            }
93        }
94    }
95}
96
97/// The TLS configurations for kafka client.
98#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
99pub struct KafkaClientTls {
100    pub server_ca_cert_path: Option<String>,
101    pub client_cert_path: Option<String>,
102    pub client_key_path: Option<String>,
103}
104
105impl KafkaClientTls {
106    /// Builds the [`ClientConfig`].
107    pub async fn to_tls_config(&self) -> Result<Arc<ClientConfig>> {
108        let builder = ClientConfig::builder();
109        let mut roots = RootCertStore::empty();
110
111        if let Some(server_ca_cert_path) = &self.server_ca_cert_path {
112            let root_cert_bytes =
113                tokio::fs::read(&server_ca_cert_path)
114                    .await
115                    .context(error::ReadFileSnafu {
116                        path: server_ca_cert_path,
117                    })?;
118            let mut cursor = Cursor::new(root_cert_bytes);
119            for cert in rustls_pemfile::certs(&mut cursor)
120                .collect::<std::result::Result<Vec<_>, _>>()
121                .context(error::ReadCertsSnafu {
122                    path: server_ca_cert_path,
123                })?
124            {
125                roots.add(cert).context(error::AddCertSnafu)?;
126            }
127        };
128        roots.add_parsable_certificates(
129            rustls_native_certs::load_native_certs().context(error::LoadSystemCertsSnafu)?,
130        );
131
132        let builder = builder.with_root_certificates(roots);
133        let config = if let (Some(cert_path), Some(key_path)) =
134            (&self.client_cert_path, &self.client_key_path)
135        {
136            let cert_bytes = tokio::fs::read(cert_path)
137                .await
138                .context(error::ReadFileSnafu { path: cert_path })?;
139            let client_certs = rustls_pemfile::certs(&mut Cursor::new(cert_bytes))
140                .collect::<std::result::Result<Vec<_>, _>>()
141                .context(error::ReadCertsSnafu { path: cert_path })?;
142            let key_bytes = tokio::fs::read(key_path)
143                .await
144                .context(error::ReadFileSnafu { path: key_path })?;
145            let client_key = rustls_pemfile::private_key(&mut Cursor::new(key_bytes))
146                .context(error::ReadKeySnafu { path: key_path })?
147                .context(error::KeyNotFoundSnafu { path: key_path })?;
148
149            builder
150                .with_client_auth_cert(client_certs, client_key)
151                .context(error::SetClientAuthCertSnafu)?
152        } else {
153            builder.with_no_client_auth()
154        };
155
156        Ok(Arc::new(config))
157    }
158}
159
160/// The connection configurations for kafka clients.
161#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
162#[serde(default)]
163pub struct KafkaConnectionConfig {
164    /// The broker endpoints of the Kafka cluster.
165    pub broker_endpoints: Vec<String>,
166    /// Client SASL.
167    pub sasl: Option<KafkaClientSasl>,
168    /// Client TLS config
169    pub tls: Option<KafkaClientTls>,
170}
171
172impl Default for KafkaConnectionConfig {
173    fn default() -> Self {
174        Self {
175            broker_endpoints: vec![BROKER_ENDPOINT.to_string()],
176            sasl: None,
177            tls: None,
178        }
179    }
180}
181
182/// Topic configurations for kafka clients.
183#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
184#[serde(default)]
185pub struct KafkaTopicConfig {
186    /// Number of topics.
187    pub num_topics: usize,
188    /// Number of partitions per topic.
189    pub num_partitions: i32,
190    /// The type of the topic selector with which to select a topic for a region.
191    pub selector_type: TopicSelectorType,
192    /// The replication factor of each topic.
193    pub replication_factor: i16,
194    /// The timeout of topic creation.
195    #[serde(with = "humantime_serde")]
196    pub create_topic_timeout: Duration,
197    /// Topic name prefix.
198    pub topic_name_prefix: String,
199}
200
201impl Default for KafkaTopicConfig {
202    fn default() -> Self {
203        Self {
204            num_topics: 64,
205            num_partitions: 1,
206            selector_type: TopicSelectorType::RoundRobin,
207            replication_factor: 1,
208            create_topic_timeout: Duration::from_secs(30),
209            topic_name_prefix: TOPIC_NAME_PREFIX.to_string(),
210        }
211    }
212}