common_wal/
lib.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![feature(assert_matches)]
16
17use std::net::SocketAddr;
18
19use error::{EndpointIPV4NotFoundSnafu, ResolveEndpointSnafu, Result};
20use serde::{Deserialize, Serialize};
21use snafu::{OptionExt, ResultExt};
22use tokio::net;
23
24pub mod config;
25pub mod error;
26pub mod options;
27#[cfg(any(test, feature = "testing"))]
28pub mod test_util;
29
30pub const BROKER_ENDPOINT: &str = "127.0.0.1:9092";
31pub const TOPIC_NAME_PREFIX: &str = "greptimedb_wal_topic";
32
33/// The type of the topic selector, i.e. with which strategy to select a topic.
34// The enum is defined here to work around cyclic dependency issues.
35#[derive(Default, Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
36#[serde(rename_all = "snake_case")]
37pub enum TopicSelectorType {
38    #[default]
39    RoundRobin,
40}
41
42pub async fn resolve_to_ipv4<T: AsRef<str>>(endpoints: &[T]) -> Result<Vec<String>> {
43    futures_util::future::try_join_all(endpoints.iter().map(resolve_to_ipv4_one)).await
44}
45
46async fn resolve_to_ipv4_one<T: AsRef<str>>(endpoint: T) -> Result<String> {
47    let endpoint = endpoint.as_ref();
48    net::lookup_host(endpoint)
49        .await
50        .context(ResolveEndpointSnafu {
51            broker_endpoint: endpoint,
52        })?
53        .find(SocketAddr::is_ipv4)
54        .map(|addr| addr.to_string())
55        .context(EndpointIPV4NotFoundSnafu {
56            broker_endpoint: endpoint,
57        })
58}
59
60#[cfg(test)]
61mod tests {
62    use std::assert_matches::assert_matches;
63
64    use common_telemetry::warn;
65    use rskafka::client::{Credentials, SaslConfig};
66
67    use super::*;
68    use crate::error::Error;
69
70    // test for resolve_broker_endpoint
71    #[tokio::test]
72    async fn test_valid_host() {
73        let host = "localhost:9092";
74        let got = resolve_to_ipv4_one(host).await;
75        assert_eq!(got.unwrap(), "127.0.0.1:9092");
76    }
77
78    #[tokio::test]
79    async fn test_valid_host_ipv6() {
80        // the host is valid, it is an IPv6 address, but we only accept IPv4 addresses
81        let host = "::1:9092";
82        let got = resolve_to_ipv4_one(host).await;
83        assert_matches!(got.unwrap_err(), Error::EndpointIPV4NotFound { .. });
84    }
85
86    #[tokio::test]
87    async fn test_invalid_host() {
88        let host = "non-exist-host:9092";
89        let got = resolve_to_ipv4_one(host).await;
90        assert_matches!(got.unwrap_err(), Error::ResolveEndpoint { .. });
91    }
92
93    #[tokio::test]
94    async fn test_sasl() {
95        common_telemetry::init_default_ut_logging();
96        let Ok(broker_endpoints) = std::env::var("GT_KAFKA_SASL_ENDPOINTS") else {
97            warn!("The endpoints is empty, skipping the test 'test_sasl'");
98            return;
99        };
100        let broker_endpoints = broker_endpoints
101            .split(',')
102            .map(|s| s.trim().to_string())
103            .collect::<Vec<_>>();
104
105        let username = "user_kafka";
106        let password = "secret";
107        let _ = rskafka::client::ClientBuilder::new(broker_endpoints.clone())
108            .sasl_config(SaslConfig::Plain(Credentials::new(
109                username.to_string(),
110                password.to_string(),
111            )))
112            .build()
113            .await
114            .unwrap();
115        let _ = rskafka::client::ClientBuilder::new(broker_endpoints.clone())
116            .sasl_config(SaslConfig::ScramSha256(Credentials::new(
117                username.to_string(),
118                password.to_string(),
119            )))
120            .build()
121            .await
122            .unwrap();
123        let _ = rskafka::client::ClientBuilder::new(broker_endpoints)
124            .sasl_config(SaslConfig::ScramSha512(Credentials::new(
125                username.to_string(),
126                password.to_string(),
127            )))
128            .build()
129            .await
130            .unwrap();
131    }
132}