log_store/kafka/
client_manager.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use common_wal::config::kafka::common::DEFAULT_BACKOFF_CONFIG;
19use common_wal::config::kafka::DatanodeKafkaConfig;
20use dashmap::DashMap;
21use rskafka::client::partition::{Compression, PartitionClient, UnknownTopicHandling};
22use rskafka::client::ClientBuilder;
23use snafu::ResultExt;
24use store_api::logstore::provider::KafkaProvider;
25use tokio::sync::{Mutex, RwLock};
26
27use crate::error::{
28    BuildClientSnafu, BuildPartitionClientSnafu, ResolveKafkaEndpointSnafu, Result, TlsConfigSnafu,
29};
30use crate::kafka::index::{GlobalIndexCollector, NoopCollector};
31use crate::kafka::producer::{OrderedBatchProducer, OrderedBatchProducerRef};
32
33// Each topic only has one partition for now.
34// The `DEFAULT_PARTITION` refers to the index of the partition.
35pub const DEFAULT_PARTITION: i32 = 0;
36
37/// Arc wrapper of ClientManager.
38pub(crate) type ClientManagerRef = Arc<ClientManager>;
39
40/// Topic client.
41#[derive(Debug, Clone)]
42pub(crate) struct Client {
43    client: Arc<PartitionClient>,
44    producer: OrderedBatchProducerRef,
45}
46
47impl Client {
48    pub(crate) fn client(&self) -> &Arc<PartitionClient> {
49        &self.client
50    }
51
52    pub(crate) fn producer(&self) -> &OrderedBatchProducerRef {
53        &self.producer
54    }
55}
56
57/// Manages client construction and accesses.
58#[derive(Debug)]
59pub(crate) struct ClientManager {
60    client: rskafka::client::Client,
61    /// Used to initialize a new [Client].
62    mutex: Mutex<()>,
63    instances: RwLock<HashMap<Arc<KafkaProvider>, Client>>,
64    global_index_collector: Option<GlobalIndexCollector>,
65
66    flush_batch_size: usize,
67    compression: Compression,
68
69    /// High watermark for each topic.
70    high_watermark: Arc<DashMap<Arc<KafkaProvider>, u64>>,
71}
72
73impl ClientManager {
74    /// Tries to create a ClientManager.
75    pub(crate) async fn try_new(
76        config: &DatanodeKafkaConfig,
77        global_index_collector: Option<GlobalIndexCollector>,
78        high_watermark: Arc<DashMap<Arc<KafkaProvider>, u64>>,
79    ) -> Result<Self> {
80        // Sets backoff config for the top-level kafka client and all clients constructed by it.
81        let broker_endpoints = common_wal::resolve_to_ipv4(&config.connection.broker_endpoints)
82            .await
83            .context(ResolveKafkaEndpointSnafu)?;
84        let mut builder =
85            ClientBuilder::new(broker_endpoints).backoff_config(DEFAULT_BACKOFF_CONFIG);
86        if let Some(sasl) = &config.connection.sasl {
87            builder = builder.sasl_config(sasl.config.clone().into_sasl_config());
88        };
89        if let Some(tls) = &config.connection.tls {
90            builder = builder.tls_config(tls.to_tls_config().await.context(TlsConfigSnafu)?)
91        };
92
93        let client = builder.build().await.with_context(|_| BuildClientSnafu {
94            broker_endpoints: config.connection.broker_endpoints.clone(),
95        })?;
96
97        Ok(Self {
98            client,
99            mutex: Mutex::new(()),
100            instances: RwLock::new(HashMap::new()),
101            flush_batch_size: config.max_batch_bytes.as_bytes() as usize,
102            compression: Compression::Lz4,
103            global_index_collector,
104            high_watermark,
105        })
106    }
107
108    async fn try_insert(&self, provider: &Arc<KafkaProvider>) -> Result<Client> {
109        let _guard = self.mutex.lock().await;
110
111        let client = self.instances.read().await.get(provider).cloned();
112        match client {
113            Some(client) => Ok(client),
114            None => {
115                let client = self.try_create_client(provider).await?;
116                self.instances
117                    .write()
118                    .await
119                    .insert(provider.clone(), client.clone());
120                self.high_watermark.insert(provider.clone(), 0);
121                Ok(client)
122            }
123        }
124    }
125
126    /// Gets the client associated with the topic. If the client does not exist, a new one will
127    /// be created and returned.
128    pub(crate) async fn get_or_insert(&self, provider: &Arc<KafkaProvider>) -> Result<Client> {
129        let client = self.instances.read().await.get(provider).cloned();
130        match client {
131            Some(client) => Ok(client),
132            None => self.try_insert(provider).await,
133        }
134    }
135
136    async fn try_create_client(&self, provider: &Arc<KafkaProvider>) -> Result<Client> {
137        // Sets to Retry to retry connecting if the kafka cluster replies with an UnknownTopic error.
138        // That's because the topic is believed to exist as the metasrv is expected to create required topics upon start.
139        // The reconnecting won't stop until succeed or a different error returns.
140        let client = self
141            .client
142            .partition_client(
143                provider.topic.as_str(),
144                DEFAULT_PARTITION,
145                UnknownTopicHandling::Retry,
146            )
147            .await
148            .context(BuildPartitionClientSnafu {
149                topic: &provider.topic,
150                partition: DEFAULT_PARTITION,
151            })
152            .map(Arc::new)?;
153
154        let (tx, rx) = OrderedBatchProducer::channel();
155        let index_collector = if let Some(global_collector) = self.global_index_collector.as_ref() {
156            global_collector
157                .provider_level_index_collector(provider.clone(), tx.clone())
158                .await
159        } else {
160            Box::new(NoopCollector)
161        };
162        let producer = Arc::new(OrderedBatchProducer::new(
163            (tx, rx),
164            provider.clone(),
165            client.clone(),
166            self.compression,
167            self.flush_batch_size,
168            index_collector,
169            self.high_watermark.clone(),
170        ));
171
172        Ok(Client { client, producer })
173    }
174
175    pub(crate) fn global_index_collector(&self) -> Option<&GlobalIndexCollector> {
176        self.global_index_collector.as_ref()
177    }
178
179    #[cfg(test)]
180    pub(crate) fn high_watermark(&self) -> &Arc<DashMap<Arc<KafkaProvider>, u64>> {
181        &self.high_watermark
182    }
183}
184
185#[cfg(test)]
186mod tests {
187    use common_wal::test_util::run_test_with_kafka_wal;
188    use tokio::sync::Barrier;
189
190    use super::*;
191    use crate::kafka::test_util::prepare;
192
193    /// Sends `get_or_insert` requests sequentially to the client manager, and checks if it could handle them correctly.
194    #[tokio::test]
195    async fn test_sequential() {
196        run_test_with_kafka_wal(|broker_endpoints| {
197            Box::pin(async {
198                let (manager, topics) = prepare("test_sequential", 128, broker_endpoints).await;
199                // Assigns multiple regions to a topic.
200                let region_topic = (0..512)
201                    .map(|region_id| (region_id, &topics[region_id % topics.len()]))
202                    .collect::<HashMap<_, _>>();
203
204                // Gets all clients sequentially.
205                for (_, topic) in region_topic {
206                    let provider = Arc::new(KafkaProvider::new(topic.to_string()));
207                    manager.get_or_insert(&provider).await.unwrap();
208                }
209
210                // Ensures all clients exist.
211                let client_pool = manager.instances.read().await;
212                let all_exist = topics.iter().all(|topic| {
213                    let provider = Arc::new(KafkaProvider::new(topic.to_string()));
214                    client_pool.contains_key(&provider)
215                });
216                assert!(all_exist);
217            })
218        })
219        .await;
220    }
221
222    /// Sends `get_or_insert` requests in parallel to the client manager, and checks if it could handle them correctly.
223    #[tokio::test(flavor = "multi_thread")]
224    async fn test_parallel() {
225        run_test_with_kafka_wal(|broker_endpoints| {
226            Box::pin(async {
227                let (manager, topics) = prepare("test_parallel", 128, broker_endpoints).await;
228                // Assigns multiple regions to a topic.
229                let region_topic = (0..512)
230                    .map(|region_id| (region_id, topics[region_id % topics.len()].clone()))
231                    .collect::<HashMap<_, _>>();
232
233                // Gets all clients in parallel.
234                let manager = Arc::new(manager);
235                let barrier = Arc::new(Barrier::new(region_topic.len()));
236                let tasks = region_topic
237                    .into_values()
238                    .map(|topic| {
239                        let manager = manager.clone();
240                        let barrier = barrier.clone();
241
242                        tokio::spawn(async move {
243                            barrier.wait().await;
244                            let provider = Arc::new(KafkaProvider::new(topic));
245                            assert!(manager.get_or_insert(&provider).await.is_ok());
246                        })
247                    })
248                    .collect::<Vec<_>>();
249                futures::future::try_join_all(tasks).await.unwrap();
250
251                // Ensures all clients exist.
252                let client_pool = manager.instances.read().await;
253                let all_exist = topics.iter().all(|topic| {
254                    let provider = Arc::new(KafkaProvider::new(topic.to_string()));
255                    client_pool.contains_key(&provider)
256                });
257                assert!(all_exist);
258            })
259        })
260        .await;
261    }
262}