log_store/kafka/
client_manager.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use common_wal::config::kafka::DatanodeKafkaConfig;
19use common_wal::config::kafka::common::DEFAULT_BACKOFF_CONFIG;
20use dashmap::DashMap;
21use rskafka::client::ClientBuilder;
22use rskafka::client::partition::{Compression, PartitionClient, UnknownTopicHandling};
23use snafu::ResultExt;
24use store_api::logstore::provider::KafkaProvider;
25use tokio::sync::{Mutex, RwLock};
26
27use crate::error::{BuildClientSnafu, BuildPartitionClientSnafu, Result, TlsConfigSnafu};
28use crate::kafka::index::{GlobalIndexCollector, NoopCollector};
29use crate::kafka::log_store::TopicStat;
30use crate::kafka::producer::{OrderedBatchProducer, OrderedBatchProducerRef};
31
32// Each topic only has one partition for now.
33// The `DEFAULT_PARTITION` refers to the index of the partition.
34pub const DEFAULT_PARTITION: i32 = 0;
35
36/// Arc wrapper of ClientManager.
37pub(crate) type ClientManagerRef = Arc<ClientManager>;
38
39/// Topic client.
40#[derive(Debug, Clone)]
41pub(crate) struct Client {
42    client: Arc<PartitionClient>,
43    producer: OrderedBatchProducerRef,
44}
45
46impl Client {
47    pub(crate) fn client(&self) -> &Arc<PartitionClient> {
48        &self.client
49    }
50
51    pub(crate) fn producer(&self) -> &OrderedBatchProducerRef {
52        &self.producer
53    }
54}
55
56/// Manages client construction and accesses.
57#[derive(Debug)]
58pub(crate) struct ClientManager {
59    client: rskafka::client::Client,
60    /// Used to initialize a new [Client].
61    mutex: Mutex<()>,
62    instances: RwLock<HashMap<Arc<KafkaProvider>, Client>>,
63    global_index_collector: Option<GlobalIndexCollector>,
64
65    flush_batch_size: usize,
66    compression: Compression,
67
68    /// The stats of each topic.
69    topic_stats: Arc<DashMap<Arc<KafkaProvider>, TopicStat>>,
70}
71
72impl ClientManager {
73    /// Tries to create a ClientManager.
74    pub(crate) async fn try_new(
75        config: &DatanodeKafkaConfig,
76        global_index_collector: Option<GlobalIndexCollector>,
77        topic_stats: Arc<DashMap<Arc<KafkaProvider>, TopicStat>>,
78    ) -> Result<Self> {
79        // Sets backoff config for the top-level kafka client and all clients constructed by it.
80        let mut builder = ClientBuilder::new(config.connection.broker_endpoints.clone())
81            .backoff_config(DEFAULT_BACKOFF_CONFIG)
82            .connect_timeout(Some(config.connection.connect_timeout))
83            .timeout(Some(config.connection.timeout));
84        if let Some(sasl) = &config.connection.sasl {
85            builder = builder.sasl_config(sasl.config.clone().into_sasl_config());
86        };
87        if let Some(tls) = &config.connection.tls {
88            builder = builder.tls_config(tls.to_tls_config().await.context(TlsConfigSnafu)?)
89        };
90
91        let client = builder.build().await.with_context(|_| BuildClientSnafu {
92            broker_endpoints: config.connection.broker_endpoints.clone(),
93        })?;
94
95        Ok(Self {
96            client,
97            mutex: Mutex::new(()),
98            instances: RwLock::new(HashMap::new()),
99            flush_batch_size: config.max_batch_bytes.as_bytes() as usize,
100            compression: Compression::Lz4,
101            global_index_collector,
102            topic_stats,
103        })
104    }
105
106    async fn try_insert(&self, provider: &Arc<KafkaProvider>) -> Result<Client> {
107        let _guard = self.mutex.lock().await;
108
109        let client = self.instances.read().await.get(provider).cloned();
110        match client {
111            Some(client) => Ok(client),
112            None => {
113                let client = self.try_create_client(provider).await?;
114                self.instances
115                    .write()
116                    .await
117                    .insert(provider.clone(), client.clone());
118                self.topic_stats
119                    .insert(provider.clone(), TopicStat::default());
120                Ok(client)
121            }
122        }
123    }
124
125    /// Gets the client associated with the topic. If the client does not exist, a new one will
126    /// be created and returned.
127    pub(crate) async fn get_or_insert(&self, provider: &Arc<KafkaProvider>) -> Result<Client> {
128        let client = self.instances.read().await.get(provider).cloned();
129        match client {
130            Some(client) => Ok(client),
131            None => self.try_insert(provider).await,
132        }
133    }
134
135    async fn try_create_client(&self, provider: &Arc<KafkaProvider>) -> Result<Client> {
136        // Sets to Retry to retry connecting if the kafka cluster replies with an UnknownTopic error.
137        // That's because the topic is believed to exist as the metasrv is expected to create required topics upon start.
138        // The reconnecting won't stop until succeed or a different error returns.
139        let client = self
140            .client
141            .partition_client(
142                provider.topic.as_str(),
143                DEFAULT_PARTITION,
144                UnknownTopicHandling::Retry,
145            )
146            .await
147            .context(BuildPartitionClientSnafu {
148                topic: &provider.topic,
149                partition: DEFAULT_PARTITION,
150            })
151            .map(Arc::new)?;
152
153        let (tx, rx) = OrderedBatchProducer::channel();
154        let index_collector = if let Some(global_collector) = self.global_index_collector.as_ref() {
155            global_collector
156                .provider_level_index_collector(provider.clone(), tx.clone())
157                .await
158        } else {
159            Box::new(NoopCollector)
160        };
161        let producer = Arc::new(OrderedBatchProducer::new(
162            (tx, rx),
163            provider.clone(),
164            client.clone(),
165            self.compression,
166            self.flush_batch_size,
167            index_collector,
168            self.topic_stats.clone(),
169        ));
170
171        Ok(Client { client, producer })
172    }
173
174    pub(crate) fn global_index_collector(&self) -> Option<&GlobalIndexCollector> {
175        self.global_index_collector.as_ref()
176    }
177
178    /// Lists all topics.
179    pub(crate) async fn list_topics(&self) -> Vec<Arc<KafkaProvider>> {
180        self.instances.read().await.keys().cloned().collect()
181    }
182
183    #[cfg(test)]
184    pub(crate) fn topic_stats(&self) -> &Arc<DashMap<Arc<KafkaProvider>, TopicStat>> {
185        &self.topic_stats
186    }
187}
188
189#[cfg(test)]
190impl ClientManager {
191    /// Returns the controller client.
192    pub(crate) fn controller_client(&self) -> rskafka::client::controller::ControllerClient {
193        self.client.controller_client().unwrap()
194    }
195}
196
197#[cfg(test)]
198mod tests {
199    use common_wal::maybe_skip_kafka_integration_test;
200    use common_wal::test_util::get_kafka_endpoints;
201    use tokio::sync::Barrier;
202
203    use super::*;
204    use crate::kafka::test_util::prepare;
205
206    /// Sends `get_or_insert` requests sequentially to the client manager, and checks if it could handle them correctly.
207    #[tokio::test]
208    async fn test_sequential() {
209        maybe_skip_kafka_integration_test!();
210        let broker_endpoints = get_kafka_endpoints();
211        let (manager, topics) = prepare("test_sequential", 128, broker_endpoints).await;
212        // Assigns multiple regions to a topic.
213        let region_topic = (0..512)
214            .map(|region_id| (region_id, &topics[region_id % topics.len()]))
215            .collect::<HashMap<_, _>>();
216
217        // Gets all clients sequentially.
218        for (_, topic) in region_topic {
219            let provider = Arc::new(KafkaProvider::new(topic.clone()));
220            manager.get_or_insert(&provider).await.unwrap();
221        }
222
223        // Ensures all clients exist.
224        let client_pool = manager.instances.read().await;
225        let all_exist = topics.iter().all(|topic| {
226            let provider = Arc::new(KafkaProvider::new(topic.clone()));
227            client_pool.contains_key(&provider)
228        });
229        assert!(all_exist);
230    }
231
232    /// Sends `get_or_insert` requests in parallel to the client manager, and checks if it could handle them correctly.
233    #[tokio::test(flavor = "multi_thread")]
234    async fn test_parallel() {
235        maybe_skip_kafka_integration_test!();
236        let broker_endpoints = get_kafka_endpoints();
237        let (manager, topics) = prepare("test_parallel", 128, broker_endpoints).await;
238        // Assigns multiple regions to a topic.
239        let region_topic = (0..512)
240            .map(|region_id| (region_id, topics[region_id % topics.len()].clone()))
241            .collect::<HashMap<_, _>>();
242
243        // Gets all clients in parallel.
244        let manager = Arc::new(manager);
245        let barrier = Arc::new(Barrier::new(region_topic.len()));
246        let tasks = region_topic
247            .into_values()
248            .map(|topic| {
249                let manager = manager.clone();
250                let barrier = barrier.clone();
251
252                tokio::spawn(async move {
253                    barrier.wait().await;
254                    let provider = Arc::new(KafkaProvider::new(topic));
255                    assert!(manager.get_or_insert(&provider).await.is_ok());
256                })
257            })
258            .collect::<Vec<_>>();
259        futures::future::try_join_all(tasks).await.unwrap();
260
261        // Ensures all clients exist.
262        let client_pool = manager.instances.read().await;
263        let all_exist = topics.iter().all(|topic| {
264            let provider = Arc::new(KafkaProvider::new(topic.clone()));
265            client_pool.contains_key(&provider)
266        });
267        assert!(all_exist);
268    }
269}