common_wal/
lib.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![feature(assert_matches)]
16#![feature(duration_constructors_lite)]
17
18use std::net::SocketAddr;
19
20use error::{EndpointIPV4NotFoundSnafu, ResolveEndpointSnafu, Result};
21use serde::{Deserialize, Serialize};
22use snafu::{OptionExt, ResultExt};
23use tokio::net;
24
25pub mod config;
26pub mod error;
27pub mod options;
28#[cfg(any(test, feature = "testing"))]
29pub mod test_util;
30
31pub const BROKER_ENDPOINT: &str = "127.0.0.1:9092";
32pub const TOPIC_NAME_PREFIX: &str = "greptimedb_wal_topic";
33
34/// The type of the topic selector, i.e. with which strategy to select a topic.
35// The enum is defined here to work around cyclic dependency issues.
36#[derive(Default, Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
37#[serde(rename_all = "snake_case")]
38pub enum TopicSelectorType {
39    #[default]
40    RoundRobin,
41}
42
43pub async fn resolve_to_ipv4<T: AsRef<str>>(endpoints: &[T]) -> Result<Vec<String>> {
44    futures_util::future::try_join_all(endpoints.iter().map(resolve_to_ipv4_one)).await
45}
46
47async fn resolve_to_ipv4_one<T: AsRef<str>>(endpoint: T) -> Result<String> {
48    let endpoint = endpoint.as_ref();
49    net::lookup_host(endpoint)
50        .await
51        .context(ResolveEndpointSnafu {
52            broker_endpoint: endpoint,
53        })?
54        .find(SocketAddr::is_ipv4)
55        .map(|addr| addr.to_string())
56        .context(EndpointIPV4NotFoundSnafu {
57            broker_endpoint: endpoint,
58        })
59}
60
61#[cfg(test)]
62mod tests {
63    use std::assert_matches::assert_matches;
64
65    use common_telemetry::warn;
66    use rskafka::client::{Credentials, SaslConfig};
67
68    use super::*;
69    use crate::error::Error;
70
71    // test for resolve_broker_endpoint
72    #[tokio::test]
73    async fn test_valid_host() {
74        let host = "localhost:9092";
75        let got = resolve_to_ipv4_one(host).await;
76        assert_eq!(got.unwrap(), "127.0.0.1:9092");
77    }
78
79    #[tokio::test]
80    async fn test_valid_host_ipv6() {
81        // the host is valid, it is an IPv6 address, but we only accept IPv4 addresses
82        let host = "::1:9092";
83        let got = resolve_to_ipv4_one(host).await;
84        assert_matches!(got.unwrap_err(), Error::EndpointIPV4NotFound { .. });
85    }
86
87    #[tokio::test]
88    async fn test_invalid_host() {
89        let host = "non-exist-host:9092";
90        let got = resolve_to_ipv4_one(host).await;
91        assert_matches!(got.unwrap_err(), Error::ResolveEndpoint { .. });
92    }
93
94    #[tokio::test]
95    async fn test_sasl() {
96        common_telemetry::init_default_ut_logging();
97        let Ok(broker_endpoints) = std::env::var("GT_KAFKA_SASL_ENDPOINTS") else {
98            warn!("The endpoints is empty, skipping the test 'test_sasl'");
99            return;
100        };
101        let broker_endpoints = broker_endpoints
102            .split(',')
103            .map(|s| s.trim().to_string())
104            .collect::<Vec<_>>();
105
106        let username = "user_kafka";
107        let password = "secret";
108        let _ = rskafka::client::ClientBuilder::new(broker_endpoints.clone())
109            .sasl_config(SaslConfig::Plain(Credentials::new(
110                username.to_string(),
111                password.to_string(),
112            )))
113            .build()
114            .await
115            .unwrap();
116        let _ = rskafka::client::ClientBuilder::new(broker_endpoints.clone())
117            .sasl_config(SaslConfig::ScramSha256(Credentials::new(
118                username.to_string(),
119                password.to_string(),
120            )))
121            .build()
122            .await
123            .unwrap();
124        let _ = rskafka::client::ClientBuilder::new(broker_endpoints)
125            .sasl_config(SaslConfig::ScramSha512(Credentials::new(
126                username.to_string(),
127                password.to_string(),
128            )))
129            .build()
130            .await
131            .unwrap();
132    }
133}