common_wal/config/kafka/
common.rs1use std::io::Cursor;
16use std::sync::Arc;
17use std::time::Duration;
18
19use common_base::readable_size::ReadableSize;
20use rskafka::BackoffConfig;
21use rskafka::client::{Credentials, SaslConfig};
22use rustls::{ClientConfig, RootCertStore};
23use serde::{Deserialize, Serialize};
24use snafu::{OptionExt, ResultExt};
25
26pub const DEFAULT_BACKOFF_CONFIG: BackoffConfig = BackoffConfig {
31    init_backoff: Duration::from_millis(100),
32    max_backoff: Duration::from_secs(1),
33    base: 3.0,
34    deadline: Some(Duration::from_secs(3)),
37};
38
39pub const DEFAULT_AUTO_PRUNE_INTERVAL: Duration = Duration::from_mins(30);
41pub const DEFAULT_AUTO_PRUNE_PARALLELISM: usize = 10;
43pub const DEFAULT_FLUSH_TRIGGER_SIZE: ReadableSize = ReadableSize::mb(512);
45pub const DEFAULT_CHECKPOINT_TRIGGER_SIZE: ReadableSize = ReadableSize::mb(128);
47
48use crate::error::{self, Result};
49use crate::{BROKER_ENDPOINT, TOPIC_NAME_PREFIX, TopicSelectorType};
50
51#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
53pub struct KafkaClientSasl {
54    #[serde(flatten)]
55    pub config: KafkaClientSaslConfig,
56}
57
58#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
59#[serde(tag = "type", rename_all = "SCREAMING-KEBAB-CASE")]
60pub enum KafkaClientSaslConfig {
61    Plain {
62        username: String,
63        password: String,
64    },
65    #[serde(rename = "SCRAM-SHA-256")]
66    ScramSha256 {
67        username: String,
68        password: String,
69    },
70    #[serde(rename = "SCRAM-SHA-512")]
71    ScramSha512 {
72        username: String,
73        password: String,
74    },
75}
76
77impl KafkaClientSaslConfig {
78    pub fn into_sasl_config(self) -> SaslConfig {
80        match self {
81            KafkaClientSaslConfig::Plain { username, password } => {
82                SaslConfig::Plain(Credentials::new(username, password))
83            }
84            KafkaClientSaslConfig::ScramSha256 { username, password } => {
85                SaslConfig::ScramSha256(Credentials::new(username, password))
86            }
87            KafkaClientSaslConfig::ScramSha512 { username, password } => {
88                SaslConfig::ScramSha512(Credentials::new(username, password))
89            }
90        }
91    }
92}
93
94#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
96pub struct KafkaClientTls {
97    pub server_ca_cert_path: Option<String>,
98    pub client_cert_path: Option<String>,
99    pub client_key_path: Option<String>,
100}
101
102impl KafkaClientTls {
103    pub async fn to_tls_config(&self) -> Result<Arc<ClientConfig>> {
105        let builder = ClientConfig::builder();
106        let mut roots = RootCertStore::empty();
107
108        if let Some(server_ca_cert_path) = &self.server_ca_cert_path {
109            let root_cert_bytes =
110                tokio::fs::read(&server_ca_cert_path)
111                    .await
112                    .context(error::ReadFileSnafu {
113                        path: server_ca_cert_path,
114                    })?;
115            let mut cursor = Cursor::new(root_cert_bytes);
116            for cert in rustls_pemfile::certs(&mut cursor)
117                .collect::<std::result::Result<Vec<_>, _>>()
118                .context(error::ReadCertsSnafu {
119                    path: server_ca_cert_path,
120                })?
121            {
122                roots.add(cert).context(error::AddCertSnafu)?;
123            }
124        };
125        roots.add_parsable_certificates(
126            rustls_native_certs::load_native_certs().context(error::LoadSystemCertsSnafu)?,
127        );
128
129        let builder = builder.with_root_certificates(roots);
130        let config = if let (Some(cert_path), Some(key_path)) =
131            (&self.client_cert_path, &self.client_key_path)
132        {
133            let cert_bytes = tokio::fs::read(cert_path)
134                .await
135                .context(error::ReadFileSnafu { path: cert_path })?;
136            let client_certs = rustls_pemfile::certs(&mut Cursor::new(cert_bytes))
137                .collect::<std::result::Result<Vec<_>, _>>()
138                .context(error::ReadCertsSnafu { path: cert_path })?;
139            let key_bytes = tokio::fs::read(key_path)
140                .await
141                .context(error::ReadFileSnafu { path: key_path })?;
142            let client_key = rustls_pemfile::private_key(&mut Cursor::new(key_bytes))
143                .context(error::ReadKeySnafu { path: key_path })?
144                .context(error::KeyNotFoundSnafu { path: key_path })?;
145
146            builder
147                .with_client_auth_cert(client_certs, client_key)
148                .context(error::SetClientAuthCertSnafu)?
149        } else {
150            builder.with_no_client_auth()
151        };
152
153        Ok(Arc::new(config))
154    }
155}
156
157#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
159#[serde(default)]
160pub struct KafkaConnectionConfig {
161    pub broker_endpoints: Vec<String>,
163    pub sasl: Option<KafkaClientSasl>,
165    pub tls: Option<KafkaClientTls>,
167}
168
169impl Default for KafkaConnectionConfig {
170    fn default() -> Self {
171        Self {
172            broker_endpoints: vec![BROKER_ENDPOINT.to_string()],
173            sasl: None,
174            tls: None,
175        }
176    }
177}
178
179#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
181#[serde(default)]
182pub struct KafkaTopicConfig {
183    pub num_topics: usize,
185    pub num_partitions: i32,
187    pub selector_type: TopicSelectorType,
189    pub replication_factor: i16,
191    #[serde(with = "humantime_serde")]
193    pub create_topic_timeout: Duration,
194    pub topic_name_prefix: String,
196}
197
198impl Default for KafkaTopicConfig {
199    fn default() -> Self {
200        Self {
201            num_topics: 64,
202            num_partitions: 1,
203            selector_type: TopicSelectorType::RoundRobin,
204            replication_factor: 1,
205            create_topic_timeout: Duration::from_secs(30),
206            topic_name_prefix: TOPIC_NAME_PREFIX.to_string(),
207        }
208    }
209}