1use std::net::SocketAddr;
16
17use error::{EndpointIPV4NotFoundSnafu, ResolveEndpointSnafu, Result};
18use serde::{Deserialize, Serialize};
19use snafu::{OptionExt, ResultExt};
20use tokio::net;
21
22pub mod config;
23pub mod error;
24pub mod options;
25#[cfg(any(test, feature = "testing"))]
26pub mod test_util;
27
28pub const BROKER_ENDPOINT: &str = "127.0.0.1:9092";
29pub const TOPIC_NAME_PREFIX: &str = "greptimedb_wal_topic";
30
31#[derive(Default, Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
34#[serde(rename_all = "snake_case")]
35pub enum TopicSelectorType {
36 #[default]
37 RoundRobin,
38}
39
40pub async fn resolve_to_ipv4<T: AsRef<str>>(endpoints: &[T]) -> Result<Vec<String>> {
41 futures_util::future::try_join_all(endpoints.iter().map(resolve_to_ipv4_one)).await
42}
43
44async fn resolve_to_ipv4_one<T: AsRef<str>>(endpoint: T) -> Result<String> {
45 let endpoint = endpoint.as_ref();
46 net::lookup_host(endpoint)
47 .await
48 .context(ResolveEndpointSnafu {
49 broker_endpoint: endpoint,
50 })?
51 .find(SocketAddr::is_ipv4)
52 .map(|addr| addr.to_string())
53 .context(EndpointIPV4NotFoundSnafu {
54 broker_endpoint: endpoint,
55 })
56}
57
58#[cfg(test)]
59mod tests {
60 use std::assert_matches;
61
62 use common_telemetry::warn;
63 use rskafka::client::{Credentials, SaslConfig};
64
65 use super::*;
66 use crate::error::Error;
67
68 #[tokio::test]
70 async fn test_valid_host() {
71 let host = "localhost:9092";
72 let got = resolve_to_ipv4_one(host).await;
73 assert_eq!(got.unwrap(), "127.0.0.1:9092");
74 }
75
76 #[tokio::test]
77 async fn test_valid_host_ipv6() {
78 let host = "::1:9092";
80 let got = resolve_to_ipv4_one(host).await;
81 assert_matches!(got.unwrap_err(), Error::EndpointIPV4NotFound { .. });
82 }
83
84 #[tokio::test]
85 async fn test_invalid_host() {
86 let host = "non-exist-host:9092";
87 let got = resolve_to_ipv4_one(host).await;
88 assert_matches!(got.unwrap_err(), Error::ResolveEndpoint { .. });
89 }
90
91 #[tokio::test]
92 async fn test_sasl() {
93 common_telemetry::init_default_ut_logging();
94 let Ok(broker_endpoints) = std::env::var("GT_KAFKA_SASL_ENDPOINTS") else {
95 warn!("The endpoints is empty, skipping the test 'test_sasl'");
96 return;
97 };
98 let broker_endpoints = broker_endpoints
99 .split(',')
100 .map(|s| s.trim().to_string())
101 .collect::<Vec<_>>();
102
103 let username = "user_kafka";
104 let password = "secret";
105 let _ = rskafka::client::ClientBuilder::new(broker_endpoints.clone())
106 .sasl_config(SaslConfig::Plain(Credentials::new(
107 username.to_string(),
108 password.to_string(),
109 )))
110 .build()
111 .await
112 .unwrap();
113 let _ = rskafka::client::ClientBuilder::new(broker_endpoints.clone())
114 .sasl_config(SaslConfig::ScramSha256(Credentials::new(
115 username.to_string(),
116 password.to_string(),
117 )))
118 .build()
119 .await
120 .unwrap();
121 let _ = rskafka::client::ClientBuilder::new(broker_endpoints)
122 .sasl_config(SaslConfig::ScramSha512(Credentials::new(
123 username.to_string(),
124 password.to_string(),
125 )))
126 .build()
127 .await
128 .unwrap();
129 }
130}