1#![feature(assert_matches)]
16#![feature(duration_constructors_lite)]
17
18use std::net::SocketAddr;
19
20use error::{EndpointIPV4NotFoundSnafu, ResolveEndpointSnafu, Result};
21use serde::{Deserialize, Serialize};
22use snafu::{OptionExt, ResultExt};
23use tokio::net;
24
25pub mod config;
26pub mod error;
27pub mod options;
28#[cfg(any(test, feature = "testing"))]
29pub mod test_util;
30
31pub const BROKER_ENDPOINT: &str = "127.0.0.1:9092";
32pub const TOPIC_NAME_PREFIX: &str = "greptimedb_wal_topic";
33
34#[derive(Default, Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
37#[serde(rename_all = "snake_case")]
38pub enum TopicSelectorType {
39 #[default]
40 RoundRobin,
41}
42
43pub async fn resolve_to_ipv4<T: AsRef<str>>(endpoints: &[T]) -> Result<Vec<String>> {
44 futures_util::future::try_join_all(endpoints.iter().map(resolve_to_ipv4_one)).await
45}
46
47async fn resolve_to_ipv4_one<T: AsRef<str>>(endpoint: T) -> Result<String> {
48 let endpoint = endpoint.as_ref();
49 net::lookup_host(endpoint)
50 .await
51 .context(ResolveEndpointSnafu {
52 broker_endpoint: endpoint,
53 })?
54 .find(SocketAddr::is_ipv4)
55 .map(|addr| addr.to_string())
56 .context(EndpointIPV4NotFoundSnafu {
57 broker_endpoint: endpoint,
58 })
59}
60
61#[cfg(test)]
62mod tests {
63 use std::assert_matches::assert_matches;
64
65 use common_telemetry::warn;
66 use rskafka::client::{Credentials, SaslConfig};
67
68 use super::*;
69 use crate::error::Error;
70
71 #[tokio::test]
73 async fn test_valid_host() {
74 let host = "localhost:9092";
75 let got = resolve_to_ipv4_one(host).await;
76 assert_eq!(got.unwrap(), "127.0.0.1:9092");
77 }
78
79 #[tokio::test]
80 async fn test_valid_host_ipv6() {
81 let host = "::1:9092";
83 let got = resolve_to_ipv4_one(host).await;
84 assert_matches!(got.unwrap_err(), Error::EndpointIPV4NotFound { .. });
85 }
86
87 #[tokio::test]
88 async fn test_invalid_host() {
89 let host = "non-exist-host:9092";
90 let got = resolve_to_ipv4_one(host).await;
91 assert_matches!(got.unwrap_err(), Error::ResolveEndpoint { .. });
92 }
93
94 #[tokio::test]
95 async fn test_sasl() {
96 common_telemetry::init_default_ut_logging();
97 let Ok(broker_endpoints) = std::env::var("GT_KAFKA_SASL_ENDPOINTS") else {
98 warn!("The endpoints is empty, skipping the test 'test_sasl'");
99 return;
100 };
101 let broker_endpoints = broker_endpoints
102 .split(',')
103 .map(|s| s.trim().to_string())
104 .collect::<Vec<_>>();
105
106 let username = "user_kafka";
107 let password = "secret";
108 let _ = rskafka::client::ClientBuilder::new(broker_endpoints.clone())
109 .sasl_config(SaslConfig::Plain(Credentials::new(
110 username.to_string(),
111 password.to_string(),
112 )))
113 .build()
114 .await
115 .unwrap();
116 let _ = rskafka::client::ClientBuilder::new(broker_endpoints.clone())
117 .sasl_config(SaslConfig::ScramSha256(Credentials::new(
118 username.to_string(),
119 password.to_string(),
120 )))
121 .build()
122 .await
123 .unwrap();
124 let _ = rskafka::client::ClientBuilder::new(broker_endpoints)
125 .sasl_config(SaslConfig::ScramSha512(Credentials::new(
126 username.to_string(),
127 password.to_string(),
128 )))
129 .build()
130 .await
131 .unwrap();
132 }
133}