1#![feature(assert_matches)]
16
17use std::net::SocketAddr;
18
19use error::{EndpointIPV4NotFoundSnafu, ResolveEndpointSnafu, Result};
20use serde::{Deserialize, Serialize};
21use snafu::{OptionExt, ResultExt};
22use tokio::net;
23
24pub mod config;
25pub mod error;
26pub mod options;
27#[cfg(any(test, feature = "testing"))]
28pub mod test_util;
29
30pub const BROKER_ENDPOINT: &str = "127.0.0.1:9092";
31pub const TOPIC_NAME_PREFIX: &str = "greptimedb_wal_topic";
32
33#[derive(Default, Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
36#[serde(rename_all = "snake_case")]
37pub enum TopicSelectorType {
38 #[default]
39 RoundRobin,
40}
41
42pub async fn resolve_to_ipv4<T: AsRef<str>>(endpoints: &[T]) -> Result<Vec<String>> {
43 futures_util::future::try_join_all(endpoints.iter().map(resolve_to_ipv4_one)).await
44}
45
46async fn resolve_to_ipv4_one<T: AsRef<str>>(endpoint: T) -> Result<String> {
47 let endpoint = endpoint.as_ref();
48 net::lookup_host(endpoint)
49 .await
50 .context(ResolveEndpointSnafu {
51 broker_endpoint: endpoint,
52 })?
53 .find(SocketAddr::is_ipv4)
54 .map(|addr| addr.to_string())
55 .context(EndpointIPV4NotFoundSnafu {
56 broker_endpoint: endpoint,
57 })
58}
59
60#[cfg(test)]
61mod tests {
62 use std::assert_matches::assert_matches;
63
64 use common_telemetry::warn;
65 use rskafka::client::{Credentials, SaslConfig};
66
67 use super::*;
68 use crate::error::Error;
69
70 #[tokio::test]
72 async fn test_valid_host() {
73 let host = "localhost:9092";
74 let got = resolve_to_ipv4_one(host).await;
75 assert_eq!(got.unwrap(), "127.0.0.1:9092");
76 }
77
78 #[tokio::test]
79 async fn test_valid_host_ipv6() {
80 let host = "::1:9092";
82 let got = resolve_to_ipv4_one(host).await;
83 assert_matches!(got.unwrap_err(), Error::EndpointIPV4NotFound { .. });
84 }
85
86 #[tokio::test]
87 async fn test_invalid_host() {
88 let host = "non-exist-host:9092";
89 let got = resolve_to_ipv4_one(host).await;
90 assert_matches!(got.unwrap_err(), Error::ResolveEndpoint { .. });
91 }
92
93 #[tokio::test]
94 async fn test_sasl() {
95 common_telemetry::init_default_ut_logging();
96 let Ok(broker_endpoints) = std::env::var("GT_KAFKA_SASL_ENDPOINTS") else {
97 warn!("The endpoints is empty, skipping the test 'test_sasl'");
98 return;
99 };
100 let broker_endpoints = broker_endpoints
101 .split(',')
102 .map(|s| s.trim().to_string())
103 .collect::<Vec<_>>();
104
105 let username = "user_kafka";
106 let password = "secret";
107 let _ = rskafka::client::ClientBuilder::new(broker_endpoints.clone())
108 .sasl_config(SaslConfig::Plain(Credentials::new(
109 username.to_string(),
110 password.to_string(),
111 )))
112 .build()
113 .await
114 .unwrap();
115 let _ = rskafka::client::ClientBuilder::new(broker_endpoints.clone())
116 .sasl_config(SaslConfig::ScramSha256(Credentials::new(
117 username.to_string(),
118 password.to_string(),
119 )))
120 .build()
121 .await
122 .unwrap();
123 let _ = rskafka::client::ClientBuilder::new(broker_endpoints)
124 .sasl_config(SaslConfig::ScramSha512(Credentials::new(
125 username.to_string(),
126 password.to_string(),
127 )))
128 .build()
129 .await
130 .unwrap();
131 }
132}