common_wal/config/kafka/
common.rs1use std::io::Cursor;
16use std::sync::Arc;
17use std::time::Duration;
18
19use common_base::readable_size::ReadableSize;
20use rskafka::client::{Credentials, SaslConfig};
21use rskafka::BackoffConfig;
22use rustls::{ClientConfig, RootCertStore};
23use serde::{Deserialize, Serialize};
24use snafu::{OptionExt, ResultExt};
25
26pub const DEFAULT_BACKOFF_CONFIG: BackoffConfig = BackoffConfig {
31 init_backoff: Duration::from_millis(100),
32 max_backoff: Duration::from_secs(1),
33 base: 3.0,
34 deadline: Some(Duration::from_secs(3)),
37};
38
39pub const DEFAULT_AUTO_PRUNE_INTERVAL: Duration = Duration::from_mins(30);
41pub const DEFAULT_AUTO_PRUNE_PARALLELISM: usize = 10;
43pub const DEFAULT_FLUSH_TRIGGER_SIZE: ReadableSize = ReadableSize::mb(512);
45pub const DEFAULT_CHECKPOINT_TRIGGER_SIZE: ReadableSize = ReadableSize::mb(128);
47
48use crate::error::{self, Result};
49use crate::{TopicSelectorType, BROKER_ENDPOINT, TOPIC_NAME_PREFIX};
50
51#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
53pub struct KafkaClientSasl {
54 #[serde(flatten)]
55 pub config: KafkaClientSaslConfig,
56}
57
58#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
59#[serde(tag = "type", rename_all = "SCREAMING-KEBAB-CASE")]
60pub enum KafkaClientSaslConfig {
61 Plain {
62 username: String,
63 password: String,
64 },
65 #[serde(rename = "SCRAM-SHA-256")]
66 ScramSha256 {
67 username: String,
68 password: String,
69 },
70 #[serde(rename = "SCRAM-SHA-512")]
71 ScramSha512 {
72 username: String,
73 password: String,
74 },
75}
76
77impl KafkaClientSaslConfig {
78 pub fn into_sasl_config(self) -> SaslConfig {
80 match self {
81 KafkaClientSaslConfig::Plain { username, password } => {
82 SaslConfig::Plain(Credentials::new(username, password))
83 }
84 KafkaClientSaslConfig::ScramSha256 { username, password } => {
85 SaslConfig::ScramSha256(Credentials::new(username, password))
86 }
87 KafkaClientSaslConfig::ScramSha512 { username, password } => {
88 SaslConfig::ScramSha512(Credentials::new(username, password))
89 }
90 }
91 }
92}
93
94#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
96pub struct KafkaClientTls {
97 pub server_ca_cert_path: Option<String>,
98 pub client_cert_path: Option<String>,
99 pub client_key_path: Option<String>,
100}
101
102impl KafkaClientTls {
103 pub async fn to_tls_config(&self) -> Result<Arc<ClientConfig>> {
105 let builder = ClientConfig::builder();
106 let mut roots = RootCertStore::empty();
107
108 if let Some(server_ca_cert_path) = &self.server_ca_cert_path {
109 let root_cert_bytes =
110 tokio::fs::read(&server_ca_cert_path)
111 .await
112 .context(error::ReadFileSnafu {
113 path: server_ca_cert_path,
114 })?;
115 let mut cursor = Cursor::new(root_cert_bytes);
116 for cert in rustls_pemfile::certs(&mut cursor)
117 .collect::<std::result::Result<Vec<_>, _>>()
118 .context(error::ReadCertsSnafu {
119 path: server_ca_cert_path,
120 })?
121 {
122 roots.add(cert).context(error::AddCertSnafu)?;
123 }
124 };
125 roots.add_parsable_certificates(
126 rustls_native_certs::load_native_certs().context(error::LoadSystemCertsSnafu)?,
127 );
128
129 let builder = builder.with_root_certificates(roots);
130 let config = if let (Some(cert_path), Some(key_path)) =
131 (&self.client_cert_path, &self.client_key_path)
132 {
133 let cert_bytes = tokio::fs::read(cert_path)
134 .await
135 .context(error::ReadFileSnafu { path: cert_path })?;
136 let client_certs = rustls_pemfile::certs(&mut Cursor::new(cert_bytes))
137 .collect::<std::result::Result<Vec<_>, _>>()
138 .context(error::ReadCertsSnafu { path: cert_path })?;
139 let key_bytes = tokio::fs::read(key_path)
140 .await
141 .context(error::ReadFileSnafu { path: key_path })?;
142 let client_key = rustls_pemfile::private_key(&mut Cursor::new(key_bytes))
143 .context(error::ReadKeySnafu { path: key_path })?
144 .context(error::KeyNotFoundSnafu { path: key_path })?;
145
146 builder
147 .with_client_auth_cert(client_certs, client_key)
148 .context(error::SetClientAuthCertSnafu)?
149 } else {
150 builder.with_no_client_auth()
151 };
152
153 Ok(Arc::new(config))
154 }
155}
156
157#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
159#[serde(default)]
160pub struct KafkaConnectionConfig {
161 pub broker_endpoints: Vec<String>,
163 pub sasl: Option<KafkaClientSasl>,
165 pub tls: Option<KafkaClientTls>,
167}
168
169impl Default for KafkaConnectionConfig {
170 fn default() -> Self {
171 Self {
172 broker_endpoints: vec![BROKER_ENDPOINT.to_string()],
173 sasl: None,
174 tls: None,
175 }
176 }
177}
178
179#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
181#[serde(default)]
182pub struct KafkaTopicConfig {
183 pub num_topics: usize,
185 pub num_partitions: i32,
187 pub selector_type: TopicSelectorType,
189 pub replication_factor: i16,
191 #[serde(with = "humantime_serde")]
193 pub create_topic_timeout: Duration,
194 pub topic_name_prefix: String,
196}
197
198impl Default for KafkaTopicConfig {
199 fn default() -> Self {
200 Self {
201 num_topics: 64,
202 num_partitions: 1,
203 selector_type: TopicSelectorType::RoundRobin,
204 replication_factor: 1,
205 create_topic_timeout: Duration::from_secs(30),
206 topic_name_prefix: TOPIC_NAME_PREFIX.to_string(),
207 }
208 }
209}