log_store/kafka/
client_manager.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use common_wal::config::kafka::common::DEFAULT_BACKOFF_CONFIG;
19use common_wal::config::kafka::DatanodeKafkaConfig;
20use dashmap::DashMap;
21use rskafka::client::partition::{Compression, PartitionClient, UnknownTopicHandling};
22use rskafka::client::ClientBuilder;
23use snafu::ResultExt;
24use store_api::logstore::provider::KafkaProvider;
25use tokio::sync::{Mutex, RwLock};
26
27use crate::error::{
28    BuildClientSnafu, BuildPartitionClientSnafu, ResolveKafkaEndpointSnafu, Result, TlsConfigSnafu,
29};
30use crate::kafka::index::{GlobalIndexCollector, NoopCollector};
31use crate::kafka::producer::{OrderedBatchProducer, OrderedBatchProducerRef};
32
33// Each topic only has one partition for now.
34// The `DEFAULT_PARTITION` refers to the index of the partition.
35pub const DEFAULT_PARTITION: i32 = 0;
36
37/// Arc wrapper of ClientManager.
38pub(crate) type ClientManagerRef = Arc<ClientManager>;
39
40/// Topic client.
41#[derive(Debug, Clone)]
42pub(crate) struct Client {
43    client: Arc<PartitionClient>,
44    producer: OrderedBatchProducerRef,
45}
46
47impl Client {
48    pub(crate) fn client(&self) -> &Arc<PartitionClient> {
49        &self.client
50    }
51
52    pub(crate) fn producer(&self) -> &OrderedBatchProducerRef {
53        &self.producer
54    }
55}
56
57/// Manages client construction and accesses.
58#[derive(Debug)]
59pub(crate) struct ClientManager {
60    client: rskafka::client::Client,
61    /// Used to initialize a new [Client].
62    mutex: Mutex<()>,
63    instances: RwLock<HashMap<Arc<KafkaProvider>, Client>>,
64    global_index_collector: Option<GlobalIndexCollector>,
65
66    flush_batch_size: usize,
67    compression: Compression,
68
69    /// High watermark for each topic.
70    high_watermark: Arc<DashMap<Arc<KafkaProvider>, u64>>,
71}
72
73impl ClientManager {
74    /// Tries to create a ClientManager.
75    pub(crate) async fn try_new(
76        config: &DatanodeKafkaConfig,
77        global_index_collector: Option<GlobalIndexCollector>,
78        high_watermark: Arc<DashMap<Arc<KafkaProvider>, u64>>,
79    ) -> Result<Self> {
80        // Sets backoff config for the top-level kafka client and all clients constructed by it.
81        let broker_endpoints = common_wal::resolve_to_ipv4(&config.connection.broker_endpoints)
82            .await
83            .context(ResolveKafkaEndpointSnafu)?;
84        let mut builder =
85            ClientBuilder::new(broker_endpoints).backoff_config(DEFAULT_BACKOFF_CONFIG);
86        if let Some(sasl) = &config.connection.sasl {
87            builder = builder.sasl_config(sasl.config.clone().into_sasl_config());
88        };
89        if let Some(tls) = &config.connection.tls {
90            builder = builder.tls_config(tls.to_tls_config().await.context(TlsConfigSnafu)?)
91        };
92
93        let client = builder.build().await.with_context(|_| BuildClientSnafu {
94            broker_endpoints: config.connection.broker_endpoints.clone(),
95        })?;
96
97        Ok(Self {
98            client,
99            mutex: Mutex::new(()),
100            instances: RwLock::new(HashMap::new()),
101            flush_batch_size: config.max_batch_bytes.as_bytes() as usize,
102            compression: Compression::Lz4,
103            global_index_collector,
104            high_watermark,
105        })
106    }
107
108    async fn try_insert(&self, provider: &Arc<KafkaProvider>) -> Result<Client> {
109        let _guard = self.mutex.lock().await;
110
111        let client = self.instances.read().await.get(provider).cloned();
112        match client {
113            Some(client) => Ok(client),
114            None => {
115                let client = self.try_create_client(provider).await?;
116                self.instances
117                    .write()
118                    .await
119                    .insert(provider.clone(), client.clone());
120                self.high_watermark.insert(provider.clone(), 0);
121                Ok(client)
122            }
123        }
124    }
125
126    /// Gets the client associated with the topic. If the client does not exist, a new one will
127    /// be created and returned.
128    pub(crate) async fn get_or_insert(&self, provider: &Arc<KafkaProvider>) -> Result<Client> {
129        let client = self.instances.read().await.get(provider).cloned();
130        match client {
131            Some(client) => Ok(client),
132            None => self.try_insert(provider).await,
133        }
134    }
135
136    async fn try_create_client(&self, provider: &Arc<KafkaProvider>) -> Result<Client> {
137        // Sets to Retry to retry connecting if the kafka cluster replies with an UnknownTopic error.
138        // That's because the topic is believed to exist as the metasrv is expected to create required topics upon start.
139        // The reconnecting won't stop until succeed or a different error returns.
140        let client = self
141            .client
142            .partition_client(
143                provider.topic.as_str(),
144                DEFAULT_PARTITION,
145                UnknownTopicHandling::Retry,
146            )
147            .await
148            .context(BuildPartitionClientSnafu {
149                topic: &provider.topic,
150                partition: DEFAULT_PARTITION,
151            })
152            .map(Arc::new)?;
153
154        let (tx, rx) = OrderedBatchProducer::channel();
155        let index_collector = if let Some(global_collector) = self.global_index_collector.as_ref() {
156            global_collector
157                .provider_level_index_collector(provider.clone(), tx.clone())
158                .await
159        } else {
160            Box::new(NoopCollector)
161        };
162        let producer = Arc::new(OrderedBatchProducer::new(
163            (tx, rx),
164            provider.clone(),
165            client.clone(),
166            self.compression,
167            self.flush_batch_size,
168            index_collector,
169            self.high_watermark.clone(),
170        ));
171
172        Ok(Client { client, producer })
173    }
174
175    pub(crate) fn global_index_collector(&self) -> Option<&GlobalIndexCollector> {
176        self.global_index_collector.as_ref()
177    }
178
179    #[cfg(test)]
180    pub(crate) fn high_watermark(&self) -> &Arc<DashMap<Arc<KafkaProvider>, u64>> {
181        &self.high_watermark
182    }
183}
184
185#[cfg(test)]
186impl ClientManager {
187    /// Returns the controller client.
188    pub(crate) fn controller_client(&self) -> rskafka::client::controller::ControllerClient {
189        self.client.controller_client().unwrap()
190    }
191}
192
193#[cfg(test)]
194mod tests {
195    use common_wal::test_util::run_test_with_kafka_wal;
196    use tokio::sync::Barrier;
197
198    use super::*;
199    use crate::kafka::test_util::prepare;
200
201    /// Sends `get_or_insert` requests sequentially to the client manager, and checks if it could handle them correctly.
202    #[tokio::test]
203    async fn test_sequential() {
204        run_test_with_kafka_wal(|broker_endpoints| {
205            Box::pin(async {
206                let (manager, topics) = prepare("test_sequential", 128, broker_endpoints).await;
207                // Assigns multiple regions to a topic.
208                let region_topic = (0..512)
209                    .map(|region_id| (region_id, &topics[region_id % topics.len()]))
210                    .collect::<HashMap<_, _>>();
211
212                // Gets all clients sequentially.
213                for (_, topic) in region_topic {
214                    let provider = Arc::new(KafkaProvider::new(topic.to_string()));
215                    manager.get_or_insert(&provider).await.unwrap();
216                }
217
218                // Ensures all clients exist.
219                let client_pool = manager.instances.read().await;
220                let all_exist = topics.iter().all(|topic| {
221                    let provider = Arc::new(KafkaProvider::new(topic.to_string()));
222                    client_pool.contains_key(&provider)
223                });
224                assert!(all_exist);
225            })
226        })
227        .await;
228    }
229
230    /// Sends `get_or_insert` requests in parallel to the client manager, and checks if it could handle them correctly.
231    #[tokio::test(flavor = "multi_thread")]
232    async fn test_parallel() {
233        run_test_with_kafka_wal(|broker_endpoints| {
234            Box::pin(async {
235                let (manager, topics) = prepare("test_parallel", 128, broker_endpoints).await;
236                // Assigns multiple regions to a topic.
237                let region_topic = (0..512)
238                    .map(|region_id| (region_id, topics[region_id % topics.len()].clone()))
239                    .collect::<HashMap<_, _>>();
240
241                // Gets all clients in parallel.
242                let manager = Arc::new(manager);
243                let barrier = Arc::new(Barrier::new(region_topic.len()));
244                let tasks = region_topic
245                    .into_values()
246                    .map(|topic| {
247                        let manager = manager.clone();
248                        let barrier = barrier.clone();
249
250                        tokio::spawn(async move {
251                            barrier.wait().await;
252                            let provider = Arc::new(KafkaProvider::new(topic));
253                            assert!(manager.get_or_insert(&provider).await.is_ok());
254                        })
255                    })
256                    .collect::<Vec<_>>();
257                futures::future::try_join_all(tasks).await.unwrap();
258
259                // Ensures all clients exist.
260                let client_pool = manager.instances.read().await;
261                let all_exist = topics.iter().all(|topic| {
262                    let provider = Arc::new(KafkaProvider::new(topic.to_string()));
263                    client_pool.contains_key(&provider)
264                });
265                assert!(all_exist);
266            })
267        })
268        .await;
269    }
270}