common_wal/config/kafka/
common.rs1use std::io::Cursor;
16use std::sync::Arc;
17use std::time::Duration;
18
19use common_base::readable_size::ReadableSize;
20use rskafka::BackoffConfig;
21use rskafka::client::{Credentials, SaslConfig};
22use rustls::{ClientConfig, RootCertStore};
23use serde::{Deserialize, Serialize};
24use snafu::{OptionExt, ResultExt};
25
26pub const DEFAULT_BACKOFF_CONFIG: BackoffConfig = BackoffConfig {
31 init_backoff: Duration::from_millis(100),
32 max_backoff: Duration::from_secs(1),
33 base: 3.0,
34 deadline: Some(Duration::from_secs(3)),
37};
38
39pub const DEFAULT_CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
41
42pub const DEFAULT_AUTO_PRUNE_INTERVAL: Duration = Duration::from_mins(30);
44pub const DEFAULT_AUTO_PRUNE_PARALLELISM: usize = 10;
46pub const DEFAULT_FLUSH_TRIGGER_SIZE: ReadableSize = ReadableSize::mb(512);
48pub const DEFAULT_CHECKPOINT_TRIGGER_SIZE: ReadableSize = ReadableSize::mb(128);
50
51use crate::error::{self, Result};
52use crate::{BROKER_ENDPOINT, TOPIC_NAME_PREFIX, TopicSelectorType};
53
54#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
56pub struct KafkaClientSasl {
57 #[serde(flatten)]
58 pub config: KafkaClientSaslConfig,
59}
60
61#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
62#[serde(tag = "type", rename_all = "SCREAMING-KEBAB-CASE")]
63pub enum KafkaClientSaslConfig {
64 Plain {
65 username: String,
66 password: String,
67 },
68 #[serde(rename = "SCRAM-SHA-256")]
69 ScramSha256 {
70 username: String,
71 password: String,
72 },
73 #[serde(rename = "SCRAM-SHA-512")]
74 ScramSha512 {
75 username: String,
76 password: String,
77 },
78}
79
80impl KafkaClientSaslConfig {
81 pub fn into_sasl_config(self) -> SaslConfig {
83 match self {
84 KafkaClientSaslConfig::Plain { username, password } => {
85 SaslConfig::Plain(Credentials::new(username, password))
86 }
87 KafkaClientSaslConfig::ScramSha256 { username, password } => {
88 SaslConfig::ScramSha256(Credentials::new(username, password))
89 }
90 KafkaClientSaslConfig::ScramSha512 { username, password } => {
91 SaslConfig::ScramSha512(Credentials::new(username, password))
92 }
93 }
94 }
95}
96
97#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
99pub struct KafkaClientTls {
100 pub server_ca_cert_path: Option<String>,
101 pub client_cert_path: Option<String>,
102 pub client_key_path: Option<String>,
103}
104
105impl KafkaClientTls {
106 pub async fn to_tls_config(&self) -> Result<Arc<ClientConfig>> {
108 let builder = ClientConfig::builder();
109 let mut roots = RootCertStore::empty();
110
111 if let Some(server_ca_cert_path) = &self.server_ca_cert_path {
112 let root_cert_bytes =
113 tokio::fs::read(&server_ca_cert_path)
114 .await
115 .context(error::ReadFileSnafu {
116 path: server_ca_cert_path,
117 })?;
118 let mut cursor = Cursor::new(root_cert_bytes);
119 for cert in rustls_pemfile::certs(&mut cursor)
120 .collect::<std::result::Result<Vec<_>, _>>()
121 .context(error::ReadCertsSnafu {
122 path: server_ca_cert_path,
123 })?
124 {
125 roots.add(cert).context(error::AddCertSnafu)?;
126 }
127 };
128 roots.add_parsable_certificates(
129 rustls_native_certs::load_native_certs().context(error::LoadSystemCertsSnafu)?,
130 );
131
132 let builder = builder.with_root_certificates(roots);
133 let config = if let (Some(cert_path), Some(key_path)) =
134 (&self.client_cert_path, &self.client_key_path)
135 {
136 let cert_bytes = tokio::fs::read(cert_path)
137 .await
138 .context(error::ReadFileSnafu { path: cert_path })?;
139 let client_certs = rustls_pemfile::certs(&mut Cursor::new(cert_bytes))
140 .collect::<std::result::Result<Vec<_>, _>>()
141 .context(error::ReadCertsSnafu { path: cert_path })?;
142 let key_bytes = tokio::fs::read(key_path)
143 .await
144 .context(error::ReadFileSnafu { path: key_path })?;
145 let client_key = rustls_pemfile::private_key(&mut Cursor::new(key_bytes))
146 .context(error::ReadKeySnafu { path: key_path })?
147 .context(error::KeyNotFoundSnafu { path: key_path })?;
148
149 builder
150 .with_client_auth_cert(client_certs, client_key)
151 .context(error::SetClientAuthCertSnafu)?
152 } else {
153 builder.with_no_client_auth()
154 };
155
156 Ok(Arc::new(config))
157 }
158}
159
160#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
162#[serde(default)]
163pub struct KafkaConnectionConfig {
164 pub broker_endpoints: Vec<String>,
166 pub sasl: Option<KafkaClientSasl>,
168 pub tls: Option<KafkaClientTls>,
170}
171
172impl Default for KafkaConnectionConfig {
173 fn default() -> Self {
174 Self {
175 broker_endpoints: vec![BROKER_ENDPOINT.to_string()],
176 sasl: None,
177 tls: None,
178 }
179 }
180}
181
182#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
184#[serde(default)]
185pub struct KafkaTopicConfig {
186 pub num_topics: usize,
188 pub num_partitions: i32,
190 pub selector_type: TopicSelectorType,
192 pub replication_factor: i16,
194 #[serde(with = "humantime_serde")]
196 pub create_topic_timeout: Duration,
197 pub topic_name_prefix: String,
199}
200
201impl Default for KafkaTopicConfig {
202 fn default() -> Self {
203 Self {
204 num_topics: 64,
205 num_partitions: 1,
206 selector_type: TopicSelectorType::RoundRobin,
207 replication_factor: 1,
208 create_topic_timeout: Duration::from_secs(30),
209 topic_name_prefix: TOPIC_NAME_PREFIX.to_string(),
210 }
211 }
212}