Skip to main content

common_wal/
lib.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::net::SocketAddr;
16
17use error::{EndpointIPV4NotFoundSnafu, ResolveEndpointSnafu, Result};
18use serde::{Deserialize, Serialize};
19use snafu::{OptionExt, ResultExt};
20use tokio::net;
21
22pub mod config;
23pub mod error;
24pub mod options;
25#[cfg(any(test, feature = "testing"))]
26pub mod test_util;
27
28pub const BROKER_ENDPOINT: &str = "127.0.0.1:9092";
29pub const TOPIC_NAME_PREFIX: &str = "greptimedb_wal_topic";
30
31/// The type of the topic selector, i.e. with which strategy to select a topic.
32// The enum is defined here to work around cyclic dependency issues.
33#[derive(Default, Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
34#[serde(rename_all = "snake_case")]
35pub enum TopicSelectorType {
36    #[default]
37    RoundRobin,
38}
39
40pub async fn resolve_to_ipv4<T: AsRef<str>>(endpoints: &[T]) -> Result<Vec<String>> {
41    futures_util::future::try_join_all(endpoints.iter().map(resolve_to_ipv4_one)).await
42}
43
44async fn resolve_to_ipv4_one<T: AsRef<str>>(endpoint: T) -> Result<String> {
45    let endpoint = endpoint.as_ref();
46    net::lookup_host(endpoint)
47        .await
48        .context(ResolveEndpointSnafu {
49            broker_endpoint: endpoint,
50        })?
51        .find(SocketAddr::is_ipv4)
52        .map(|addr| addr.to_string())
53        .context(EndpointIPV4NotFoundSnafu {
54            broker_endpoint: endpoint,
55        })
56}
57
58#[cfg(test)]
59mod tests {
60    use std::assert_matches;
61
62    use common_telemetry::warn;
63    use rskafka::client::{Credentials, SaslConfig};
64
65    use super::*;
66    use crate::error::Error;
67
68    // test for resolve_broker_endpoint
69    #[tokio::test]
70    async fn test_valid_host() {
71        let host = "localhost:9092";
72        let got = resolve_to_ipv4_one(host).await;
73        assert_eq!(got.unwrap(), "127.0.0.1:9092");
74    }
75
76    #[tokio::test]
77    async fn test_valid_host_ipv6() {
78        // the host is valid, it is an IPv6 address, but we only accept IPv4 addresses
79        let host = "::1:9092";
80        let got = resolve_to_ipv4_one(host).await;
81        assert_matches!(got.unwrap_err(), Error::EndpointIPV4NotFound { .. });
82    }
83
84    #[tokio::test]
85    async fn test_invalid_host() {
86        let host = "non-exist-host:9092";
87        let got = resolve_to_ipv4_one(host).await;
88        assert_matches!(got.unwrap_err(), Error::ResolveEndpoint { .. });
89    }
90
91    #[tokio::test]
92    async fn test_sasl() {
93        common_telemetry::init_default_ut_logging();
94        let Ok(broker_endpoints) = std::env::var("GT_KAFKA_SASL_ENDPOINTS") else {
95            warn!("The endpoints is empty, skipping the test 'test_sasl'");
96            return;
97        };
98        let broker_endpoints = broker_endpoints
99            .split(',')
100            .map(|s| s.trim().to_string())
101            .collect::<Vec<_>>();
102
103        let username = "user_kafka";
104        let password = "secret";
105        let _ = rskafka::client::ClientBuilder::new(broker_endpoints.clone())
106            .sasl_config(SaslConfig::Plain(Credentials::new(
107                username.to_string(),
108                password.to_string(),
109            )))
110            .build()
111            .await
112            .unwrap();
113        let _ = rskafka::client::ClientBuilder::new(broker_endpoints.clone())
114            .sasl_config(SaslConfig::ScramSha256(Credentials::new(
115                username.to_string(),
116                password.to_string(),
117            )))
118            .build()
119            .await
120            .unwrap();
121        let _ = rskafka::client::ClientBuilder::new(broker_endpoints)
122            .sasl_config(SaslConfig::ScramSha512(Credentials::new(
123                username.to_string(),
124                password.to_string(),
125            )))
126            .build()
127            .await
128            .unwrap();
129    }
130}