common_wal/config/kafka/
common.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::io::Cursor;
16use std::sync::Arc;
17use std::time::Duration;
18
19use common_base::readable_size::ReadableSize;
20use rskafka::client::{Credentials, SaslConfig};
21use rskafka::BackoffConfig;
22use rustls::{ClientConfig, RootCertStore};
23use serde::{Deserialize, Serialize};
24use snafu::{OptionExt, ResultExt};
25
26/// The default backoff config for kafka client.
27///
28/// If the operation fails, the client will retry 3 times.
29/// The backoff time is 100ms, 300ms, 900ms.
30pub const DEFAULT_BACKOFF_CONFIG: BackoffConfig = BackoffConfig {
31    init_backoff: Duration::from_millis(100),
32    max_backoff: Duration::from_secs(1),
33    base: 3.0,
34    // The deadline shouldn't be too long,
35    // otherwise the client will block the worker loop for a long time.
36    deadline: Some(Duration::from_secs(3)),
37};
38
39/// Default interval for auto WAL pruning.
40pub const DEFAULT_AUTO_PRUNE_INTERVAL: Duration = Duration::from_mins(30);
41/// Default limit for concurrent auto pruning tasks.
42pub const DEFAULT_AUTO_PRUNE_PARALLELISM: usize = 10;
43/// Default size of WAL to trigger flush.
44pub const DEFAULT_FLUSH_TRIGGER_SIZE: ReadableSize = ReadableSize::mb(512);
45/// Default checkpoint trigger size.
46pub const DEFAULT_CHECKPOINT_TRIGGER_SIZE: ReadableSize = ReadableSize::mb(128);
47
48use crate::error::{self, Result};
49use crate::{TopicSelectorType, BROKER_ENDPOINT, TOPIC_NAME_PREFIX};
50
51/// The SASL configurations for kafka client.
52#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
53pub struct KafkaClientSasl {
54    #[serde(flatten)]
55    pub config: KafkaClientSaslConfig,
56}
57
58#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
59#[serde(tag = "type", rename_all = "SCREAMING-KEBAB-CASE")]
60pub enum KafkaClientSaslConfig {
61    Plain {
62        username: String,
63        password: String,
64    },
65    #[serde(rename = "SCRAM-SHA-256")]
66    ScramSha256 {
67        username: String,
68        password: String,
69    },
70    #[serde(rename = "SCRAM-SHA-512")]
71    ScramSha512 {
72        username: String,
73        password: String,
74    },
75}
76
77impl KafkaClientSaslConfig {
78    /// Converts to [`SaslConfig`].
79    pub fn into_sasl_config(self) -> SaslConfig {
80        match self {
81            KafkaClientSaslConfig::Plain { username, password } => {
82                SaslConfig::Plain(Credentials::new(username, password))
83            }
84            KafkaClientSaslConfig::ScramSha256 { username, password } => {
85                SaslConfig::ScramSha256(Credentials::new(username, password))
86            }
87            KafkaClientSaslConfig::ScramSha512 { username, password } => {
88                SaslConfig::ScramSha512(Credentials::new(username, password))
89            }
90        }
91    }
92}
93
94/// The TLS configurations for kafka client.
95#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
96pub struct KafkaClientTls {
97    pub server_ca_cert_path: Option<String>,
98    pub client_cert_path: Option<String>,
99    pub client_key_path: Option<String>,
100}
101
102impl KafkaClientTls {
103    /// Builds the [`ClientConfig`].
104    pub async fn to_tls_config(&self) -> Result<Arc<ClientConfig>> {
105        let builder = ClientConfig::builder();
106        let mut roots = RootCertStore::empty();
107
108        if let Some(server_ca_cert_path) = &self.server_ca_cert_path {
109            let root_cert_bytes =
110                tokio::fs::read(&server_ca_cert_path)
111                    .await
112                    .context(error::ReadFileSnafu {
113                        path: server_ca_cert_path,
114                    })?;
115            let mut cursor = Cursor::new(root_cert_bytes);
116            for cert in rustls_pemfile::certs(&mut cursor)
117                .collect::<std::result::Result<Vec<_>, _>>()
118                .context(error::ReadCertsSnafu {
119                    path: server_ca_cert_path,
120                })?
121            {
122                roots.add(cert).context(error::AddCertSnafu)?;
123            }
124        };
125        roots.add_parsable_certificates(
126            rustls_native_certs::load_native_certs().context(error::LoadSystemCertsSnafu)?,
127        );
128
129        let builder = builder.with_root_certificates(roots);
130        let config = if let (Some(cert_path), Some(key_path)) =
131            (&self.client_cert_path, &self.client_key_path)
132        {
133            let cert_bytes = tokio::fs::read(cert_path)
134                .await
135                .context(error::ReadFileSnafu { path: cert_path })?;
136            let client_certs = rustls_pemfile::certs(&mut Cursor::new(cert_bytes))
137                .collect::<std::result::Result<Vec<_>, _>>()
138                .context(error::ReadCertsSnafu { path: cert_path })?;
139            let key_bytes = tokio::fs::read(key_path)
140                .await
141                .context(error::ReadFileSnafu { path: key_path })?;
142            let client_key = rustls_pemfile::private_key(&mut Cursor::new(key_bytes))
143                .context(error::ReadKeySnafu { path: key_path })?
144                .context(error::KeyNotFoundSnafu { path: key_path })?;
145
146            builder
147                .with_client_auth_cert(client_certs, client_key)
148                .context(error::SetClientAuthCertSnafu)?
149        } else {
150            builder.with_no_client_auth()
151        };
152
153        Ok(Arc::new(config))
154    }
155}
156
157/// The connection configurations for kafka clients.
158#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
159#[serde(default)]
160pub struct KafkaConnectionConfig {
161    /// The broker endpoints of the Kafka cluster.
162    pub broker_endpoints: Vec<String>,
163    /// Client SASL.
164    pub sasl: Option<KafkaClientSasl>,
165    /// Client TLS config
166    pub tls: Option<KafkaClientTls>,
167}
168
169impl Default for KafkaConnectionConfig {
170    fn default() -> Self {
171        Self {
172            broker_endpoints: vec![BROKER_ENDPOINT.to_string()],
173            sasl: None,
174            tls: None,
175        }
176    }
177}
178
179/// Topic configurations for kafka clients.
180#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
181#[serde(default)]
182pub struct KafkaTopicConfig {
183    /// Number of topics.
184    pub num_topics: usize,
185    /// Number of partitions per topic.
186    pub num_partitions: i32,
187    /// The type of the topic selector with which to select a topic for a region.
188    pub selector_type: TopicSelectorType,
189    /// The replication factor of each topic.
190    pub replication_factor: i16,
191    /// The timeout of topic creation.
192    #[serde(with = "humantime_serde")]
193    pub create_topic_timeout: Duration,
194    /// Topic name prefix.
195    pub topic_name_prefix: String,
196}
197
198impl Default for KafkaTopicConfig {
199    fn default() -> Self {
200        Self {
201            num_topics: 64,
202            num_partitions: 1,
203            selector_type: TopicSelectorType::RoundRobin,
204            replication_factor: 1,
205            create_topic_timeout: Duration::from_secs(30),
206            topic_name_prefix: TOPIC_NAME_PREFIX.to_string(),
207        }
208    }
209}