common_wal/config/kafka/
common.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::io::Cursor;
16use std::sync::Arc;
17use std::time::Duration;
18
19use rskafka::client::{Credentials, SaslConfig};
20use rskafka::BackoffConfig;
21use rustls::{ClientConfig, RootCertStore};
22use serde::{Deserialize, Serialize};
23use snafu::{OptionExt, ResultExt};
24
25/// The default backoff config for kafka client.
26///
27/// If the operation fails, the client will retry 3 times.
28/// The backoff time is 100ms, 300ms, 900ms.
29pub const DEFAULT_BACKOFF_CONFIG: BackoffConfig = BackoffConfig {
30    init_backoff: Duration::from_millis(100),
31    max_backoff: Duration::from_secs(1),
32    base: 3.0,
33    // The deadline shouldn't be too long,
34    // otherwise the client will block the worker loop for a long time.
35    deadline: Some(Duration::from_secs(3)),
36};
37
38/// Default interval for auto WAL pruning.
39pub const DEFAULT_AUTO_PRUNE_INTERVAL: Duration = Duration::ZERO;
40/// Default limit for concurrent auto pruning tasks.
41pub const DEFAULT_AUTO_PRUNE_PARALLELISM: usize = 10;
42/// Default interval for sending flush request to regions when pruning remote WAL.
43pub const DEFAULT_TRIGGER_FLUSH_THRESHOLD: u64 = 0;
44
45use crate::error::{self, Result};
46use crate::{TopicSelectorType, BROKER_ENDPOINT, TOPIC_NAME_PREFIX};
47
48/// The SASL configurations for kafka client.
49#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
50pub struct KafkaClientSasl {
51    #[serde(flatten)]
52    pub config: KafkaClientSaslConfig,
53}
54
55#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
56#[serde(tag = "type", rename_all = "SCREAMING-KEBAB-CASE")]
57pub enum KafkaClientSaslConfig {
58    Plain {
59        username: String,
60        password: String,
61    },
62    #[serde(rename = "SCRAM-SHA-256")]
63    ScramSha256 {
64        username: String,
65        password: String,
66    },
67    #[serde(rename = "SCRAM-SHA-512")]
68    ScramSha512 {
69        username: String,
70        password: String,
71    },
72}
73
74impl KafkaClientSaslConfig {
75    /// Converts to [`SaslConfig`].
76    pub fn into_sasl_config(self) -> SaslConfig {
77        match self {
78            KafkaClientSaslConfig::Plain { username, password } => {
79                SaslConfig::Plain(Credentials::new(username, password))
80            }
81            KafkaClientSaslConfig::ScramSha256 { username, password } => {
82                SaslConfig::ScramSha256(Credentials::new(username, password))
83            }
84            KafkaClientSaslConfig::ScramSha512 { username, password } => {
85                SaslConfig::ScramSha512(Credentials::new(username, password))
86            }
87        }
88    }
89}
90
91/// The TLS configurations for kafka client.
92#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
93pub struct KafkaClientTls {
94    pub server_ca_cert_path: Option<String>,
95    pub client_cert_path: Option<String>,
96    pub client_key_path: Option<String>,
97}
98
99impl KafkaClientTls {
100    /// Builds the [`ClientConfig`].
101    pub async fn to_tls_config(&self) -> Result<Arc<ClientConfig>> {
102        let builder = ClientConfig::builder();
103        let mut roots = RootCertStore::empty();
104
105        if let Some(server_ca_cert_path) = &self.server_ca_cert_path {
106            let root_cert_bytes =
107                tokio::fs::read(&server_ca_cert_path)
108                    .await
109                    .context(error::ReadFileSnafu {
110                        path: server_ca_cert_path,
111                    })?;
112            let mut cursor = Cursor::new(root_cert_bytes);
113            for cert in rustls_pemfile::certs(&mut cursor)
114                .collect::<std::result::Result<Vec<_>, _>>()
115                .context(error::ReadCertsSnafu {
116                    path: server_ca_cert_path,
117                })?
118            {
119                roots.add(cert).context(error::AddCertSnafu)?;
120            }
121        };
122        roots.add_parsable_certificates(
123            rustls_native_certs::load_native_certs().context(error::LoadSystemCertsSnafu)?,
124        );
125
126        let builder = builder.with_root_certificates(roots);
127        let config = if let (Some(cert_path), Some(key_path)) =
128            (&self.client_cert_path, &self.client_key_path)
129        {
130            let cert_bytes = tokio::fs::read(cert_path)
131                .await
132                .context(error::ReadFileSnafu { path: cert_path })?;
133            let client_certs = rustls_pemfile::certs(&mut Cursor::new(cert_bytes))
134                .collect::<std::result::Result<Vec<_>, _>>()
135                .context(error::ReadCertsSnafu { path: cert_path })?;
136            let key_bytes = tokio::fs::read(key_path)
137                .await
138                .context(error::ReadFileSnafu { path: key_path })?;
139            let client_key = rustls_pemfile::private_key(&mut Cursor::new(key_bytes))
140                .context(error::ReadKeySnafu { path: key_path })?
141                .context(error::KeyNotFoundSnafu { path: key_path })?;
142
143            builder
144                .with_client_auth_cert(client_certs, client_key)
145                .context(error::SetClientAuthCertSnafu)?
146        } else {
147            builder.with_no_client_auth()
148        };
149
150        Ok(Arc::new(config))
151    }
152}
153
154/// The connection configurations for kafka clients.
155#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
156#[serde(default)]
157pub struct KafkaConnectionConfig {
158    /// The broker endpoints of the Kafka cluster.
159    pub broker_endpoints: Vec<String>,
160    /// Client SASL.
161    pub sasl: Option<KafkaClientSasl>,
162    /// Client TLS config
163    pub tls: Option<KafkaClientTls>,
164}
165
166impl Default for KafkaConnectionConfig {
167    fn default() -> Self {
168        Self {
169            broker_endpoints: vec![BROKER_ENDPOINT.to_string()],
170            sasl: None,
171            tls: None,
172        }
173    }
174}
175
176/// Topic configurations for kafka clients.
177#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
178#[serde(default)]
179pub struct KafkaTopicConfig {
180    /// Number of topics.
181    pub num_topics: usize,
182    /// Number of partitions per topic.
183    pub num_partitions: i32,
184    /// The type of the topic selector with which to select a topic for a region.
185    pub selector_type: TopicSelectorType,
186    /// The replication factor of each topic.
187    pub replication_factor: i16,
188    /// The timeout of topic creation.
189    #[serde(with = "humantime_serde")]
190    pub create_topic_timeout: Duration,
191    /// Topic name prefix.
192    pub topic_name_prefix: String,
193}
194
195impl Default for KafkaTopicConfig {
196    fn default() -> Self {
197        Self {
198            num_topics: 64,
199            num_partitions: 1,
200            selector_type: TopicSelectorType::RoundRobin,
201            replication_factor: 1,
202            create_topic_timeout: Duration::from_secs(30),
203            topic_name_prefix: TOPIC_NAME_PREFIX.to_string(),
204        }
205    }
206}