common_wal/config/kafka/
common.rs1use std::io::Cursor;
16use std::sync::Arc;
17use std::time::Duration;
18
19use rskafka::client::{Credentials, SaslConfig};
20use rskafka::BackoffConfig;
21use rustls::{ClientConfig, RootCertStore};
22use serde::{Deserialize, Serialize};
23use snafu::{OptionExt, ResultExt};
24
25pub const DEFAULT_BACKOFF_CONFIG: BackoffConfig = BackoffConfig {
30 init_backoff: Duration::from_millis(100),
31 max_backoff: Duration::from_secs(1),
32 base: 3.0,
33 deadline: Some(Duration::from_secs(3)),
36};
37
38pub const DEFAULT_AUTO_PRUNE_INTERVAL: Duration = Duration::ZERO;
40pub const DEFAULT_AUTO_PRUNE_PARALLELISM: usize = 10;
42pub const DEFAULT_TRIGGER_FLUSH_THRESHOLD: u64 = 0;
44
45use crate::error::{self, Result};
46use crate::{TopicSelectorType, BROKER_ENDPOINT, TOPIC_NAME_PREFIX};
47
48#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
50pub struct KafkaClientSasl {
51 #[serde(flatten)]
52 pub config: KafkaClientSaslConfig,
53}
54
55#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
56#[serde(tag = "type", rename_all = "SCREAMING-KEBAB-CASE")]
57pub enum KafkaClientSaslConfig {
58 Plain {
59 username: String,
60 password: String,
61 },
62 #[serde(rename = "SCRAM-SHA-256")]
63 ScramSha256 {
64 username: String,
65 password: String,
66 },
67 #[serde(rename = "SCRAM-SHA-512")]
68 ScramSha512 {
69 username: String,
70 password: String,
71 },
72}
73
74impl KafkaClientSaslConfig {
75 pub fn into_sasl_config(self) -> SaslConfig {
77 match self {
78 KafkaClientSaslConfig::Plain { username, password } => {
79 SaslConfig::Plain(Credentials::new(username, password))
80 }
81 KafkaClientSaslConfig::ScramSha256 { username, password } => {
82 SaslConfig::ScramSha256(Credentials::new(username, password))
83 }
84 KafkaClientSaslConfig::ScramSha512 { username, password } => {
85 SaslConfig::ScramSha512(Credentials::new(username, password))
86 }
87 }
88 }
89}
90
91#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
93pub struct KafkaClientTls {
94 pub server_ca_cert_path: Option<String>,
95 pub client_cert_path: Option<String>,
96 pub client_key_path: Option<String>,
97}
98
99impl KafkaClientTls {
100 pub async fn to_tls_config(&self) -> Result<Arc<ClientConfig>> {
102 let builder = ClientConfig::builder();
103 let mut roots = RootCertStore::empty();
104
105 if let Some(server_ca_cert_path) = &self.server_ca_cert_path {
106 let root_cert_bytes =
107 tokio::fs::read(&server_ca_cert_path)
108 .await
109 .context(error::ReadFileSnafu {
110 path: server_ca_cert_path,
111 })?;
112 let mut cursor = Cursor::new(root_cert_bytes);
113 for cert in rustls_pemfile::certs(&mut cursor)
114 .collect::<std::result::Result<Vec<_>, _>>()
115 .context(error::ReadCertsSnafu {
116 path: server_ca_cert_path,
117 })?
118 {
119 roots.add(cert).context(error::AddCertSnafu)?;
120 }
121 };
122 roots.add_parsable_certificates(
123 rustls_native_certs::load_native_certs().context(error::LoadSystemCertsSnafu)?,
124 );
125
126 let builder = builder.with_root_certificates(roots);
127 let config = if let (Some(cert_path), Some(key_path)) =
128 (&self.client_cert_path, &self.client_key_path)
129 {
130 let cert_bytes = tokio::fs::read(cert_path)
131 .await
132 .context(error::ReadFileSnafu { path: cert_path })?;
133 let client_certs = rustls_pemfile::certs(&mut Cursor::new(cert_bytes))
134 .collect::<std::result::Result<Vec<_>, _>>()
135 .context(error::ReadCertsSnafu { path: cert_path })?;
136 let key_bytes = tokio::fs::read(key_path)
137 .await
138 .context(error::ReadFileSnafu { path: key_path })?;
139 let client_key = rustls_pemfile::private_key(&mut Cursor::new(key_bytes))
140 .context(error::ReadKeySnafu { path: key_path })?
141 .context(error::KeyNotFoundSnafu { path: key_path })?;
142
143 builder
144 .with_client_auth_cert(client_certs, client_key)
145 .context(error::SetClientAuthCertSnafu)?
146 } else {
147 builder.with_no_client_auth()
148 };
149
150 Ok(Arc::new(config))
151 }
152}
153
154#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
156#[serde(default)]
157pub struct KafkaConnectionConfig {
158 pub broker_endpoints: Vec<String>,
160 pub sasl: Option<KafkaClientSasl>,
162 pub tls: Option<KafkaClientTls>,
164}
165
166impl Default for KafkaConnectionConfig {
167 fn default() -> Self {
168 Self {
169 broker_endpoints: vec![BROKER_ENDPOINT.to_string()],
170 sasl: None,
171 tls: None,
172 }
173 }
174}
175
176#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
178#[serde(default)]
179pub struct KafkaTopicConfig {
180 pub num_topics: usize,
182 pub num_partitions: i32,
184 pub selector_type: TopicSelectorType,
186 pub replication_factor: i16,
188 #[serde(with = "humantime_serde")]
190 pub create_topic_timeout: Duration,
191 pub topic_name_prefix: String,
193}
194
195impl Default for KafkaTopicConfig {
196 fn default() -> Self {
197 Self {
198 num_topics: 64,
199 num_partitions: 1,
200 selector_type: TopicSelectorType::RoundRobin,
201 replication_factor: 1,
202 create_topic_timeout: Duration::from_secs(30),
203 topic_name_prefix: TOPIC_NAME_PREFIX.to_string(),
204 }
205 }
206}