log_store/kafka/
client_manager.rs1use std::collections::HashMap;
16use std::sync::Arc;
17
18use common_wal::config::kafka::DatanodeKafkaConfig;
19use common_wal::config::kafka::common::DEFAULT_BACKOFF_CONFIG;
20use dashmap::DashMap;
21use rskafka::client::ClientBuilder;
22use rskafka::client::partition::{Compression, PartitionClient, UnknownTopicHandling};
23use snafu::ResultExt;
24use store_api::logstore::provider::KafkaProvider;
25use tokio::sync::{Mutex, RwLock};
26
27use crate::error::{BuildClientSnafu, BuildPartitionClientSnafu, Result, TlsConfigSnafu};
28use crate::kafka::index::{GlobalIndexCollector, NoopCollector};
29use crate::kafka::log_store::TopicStat;
30use crate::kafka::producer::{OrderedBatchProducer, OrderedBatchProducerRef};
31
32pub const DEFAULT_PARTITION: i32 = 0;
35
36pub(crate) type ClientManagerRef = Arc<ClientManager>;
38
39#[derive(Debug, Clone)]
41pub(crate) struct Client {
42 client: Arc<PartitionClient>,
43 producer: OrderedBatchProducerRef,
44}
45
46impl Client {
47 pub(crate) fn client(&self) -> &Arc<PartitionClient> {
48 &self.client
49 }
50
51 pub(crate) fn producer(&self) -> &OrderedBatchProducerRef {
52 &self.producer
53 }
54}
55
56#[derive(Debug)]
58pub(crate) struct ClientManager {
59 client: rskafka::client::Client,
60 mutex: Mutex<()>,
62 instances: RwLock<HashMap<Arc<KafkaProvider>, Client>>,
63 global_index_collector: Option<GlobalIndexCollector>,
64
65 flush_batch_size: usize,
66 compression: Compression,
67
68 topic_stats: Arc<DashMap<Arc<KafkaProvider>, TopicStat>>,
70}
71
72impl ClientManager {
73 pub(crate) async fn try_new(
75 config: &DatanodeKafkaConfig,
76 global_index_collector: Option<GlobalIndexCollector>,
77 topic_stats: Arc<DashMap<Arc<KafkaProvider>, TopicStat>>,
78 ) -> Result<Self> {
79 let mut builder = ClientBuilder::new(config.connection.broker_endpoints.clone())
81 .backoff_config(DEFAULT_BACKOFF_CONFIG)
82 .connect_timeout(Some(config.connection.connect_timeout))
83 .timeout(Some(config.connection.timeout));
84 if let Some(sasl) = &config.connection.sasl {
85 builder = builder.sasl_config(sasl.config.clone().into_sasl_config());
86 };
87 if let Some(tls) = &config.connection.tls {
88 builder = builder.tls_config(tls.to_tls_config().await.context(TlsConfigSnafu)?)
89 };
90
91 let client = builder.build().await.with_context(|_| BuildClientSnafu {
92 broker_endpoints: config.connection.broker_endpoints.clone(),
93 })?;
94
95 Ok(Self {
96 client,
97 mutex: Mutex::new(()),
98 instances: RwLock::new(HashMap::new()),
99 flush_batch_size: config.max_batch_bytes.as_bytes() as usize,
100 compression: Compression::Lz4,
101 global_index_collector,
102 topic_stats,
103 })
104 }
105
106 async fn try_insert(&self, provider: &Arc<KafkaProvider>) -> Result<Client> {
107 let _guard = self.mutex.lock().await;
108
109 let client = self.instances.read().await.get(provider).cloned();
110 match client {
111 Some(client) => Ok(client),
112 None => {
113 let client = self.try_create_client(provider).await?;
114 self.instances
115 .write()
116 .await
117 .insert(provider.clone(), client.clone());
118 self.topic_stats
119 .insert(provider.clone(), TopicStat::default());
120 Ok(client)
121 }
122 }
123 }
124
125 pub(crate) async fn get_or_insert(&self, provider: &Arc<KafkaProvider>) -> Result<Client> {
128 let client = self.instances.read().await.get(provider).cloned();
129 match client {
130 Some(client) => Ok(client),
131 None => self.try_insert(provider).await,
132 }
133 }
134
135 async fn try_create_client(&self, provider: &Arc<KafkaProvider>) -> Result<Client> {
136 let client = self
140 .client
141 .partition_client(
142 provider.topic.as_str(),
143 DEFAULT_PARTITION,
144 UnknownTopicHandling::Retry,
145 )
146 .await
147 .context(BuildPartitionClientSnafu {
148 topic: &provider.topic,
149 partition: DEFAULT_PARTITION,
150 })
151 .map(Arc::new)?;
152
153 let (tx, rx) = OrderedBatchProducer::channel();
154 let index_collector = if let Some(global_collector) = self.global_index_collector.as_ref() {
155 global_collector
156 .provider_level_index_collector(provider.clone(), tx.clone())
157 .await
158 } else {
159 Box::new(NoopCollector)
160 };
161 let producer = Arc::new(OrderedBatchProducer::new(
162 (tx, rx),
163 provider.clone(),
164 client.clone(),
165 self.compression,
166 self.flush_batch_size,
167 index_collector,
168 self.topic_stats.clone(),
169 ));
170
171 Ok(Client { client, producer })
172 }
173
174 pub(crate) fn global_index_collector(&self) -> Option<&GlobalIndexCollector> {
175 self.global_index_collector.as_ref()
176 }
177
178 pub(crate) async fn list_topics(&self) -> Vec<Arc<KafkaProvider>> {
180 self.instances.read().await.keys().cloned().collect()
181 }
182
183 #[cfg(test)]
184 pub(crate) fn topic_stats(&self) -> &Arc<DashMap<Arc<KafkaProvider>, TopicStat>> {
185 &self.topic_stats
186 }
187}
188
189#[cfg(test)]
190impl ClientManager {
191 pub(crate) fn controller_client(&self) -> rskafka::client::controller::ControllerClient {
193 self.client.controller_client().unwrap()
194 }
195}
196
197#[cfg(test)]
198mod tests {
199 use common_wal::maybe_skip_kafka_integration_test;
200 use common_wal::test_util::get_kafka_endpoints;
201 use tokio::sync::Barrier;
202
203 use super::*;
204 use crate::kafka::test_util::prepare;
205
206 #[tokio::test]
208 async fn test_sequential() {
209 maybe_skip_kafka_integration_test!();
210 let broker_endpoints = get_kafka_endpoints();
211 let (manager, topics) = prepare("test_sequential", 128, broker_endpoints).await;
212 let region_topic = (0..512)
214 .map(|region_id| (region_id, &topics[region_id % topics.len()]))
215 .collect::<HashMap<_, _>>();
216
217 for (_, topic) in region_topic {
219 let provider = Arc::new(KafkaProvider::new(topic.clone()));
220 manager.get_or_insert(&provider).await.unwrap();
221 }
222
223 let client_pool = manager.instances.read().await;
225 let all_exist = topics.iter().all(|topic| {
226 let provider = Arc::new(KafkaProvider::new(topic.clone()));
227 client_pool.contains_key(&provider)
228 });
229 assert!(all_exist);
230 }
231
232 #[tokio::test(flavor = "multi_thread")]
234 async fn test_parallel() {
235 maybe_skip_kafka_integration_test!();
236 let broker_endpoints = get_kafka_endpoints();
237 let (manager, topics) = prepare("test_parallel", 128, broker_endpoints).await;
238 let region_topic = (0..512)
240 .map(|region_id| (region_id, topics[region_id % topics.len()].clone()))
241 .collect::<HashMap<_, _>>();
242
243 let manager = Arc::new(manager);
245 let barrier = Arc::new(Barrier::new(region_topic.len()));
246 let tasks = region_topic
247 .into_values()
248 .map(|topic| {
249 let manager = manager.clone();
250 let barrier = barrier.clone();
251
252 tokio::spawn(async move {
253 barrier.wait().await;
254 let provider = Arc::new(KafkaProvider::new(topic));
255 assert!(manager.get_or_insert(&provider).await.is_ok());
256 })
257 })
258 .collect::<Vec<_>>();
259 futures::future::try_join_all(tasks).await.unwrap();
260
261 let client_pool = manager.instances.read().await;
263 let all_exist = topics.iter().all(|topic| {
264 let provider = Arc::new(KafkaProvider::new(topic.clone()));
265 client_pool.contains_key(&provider)
266 });
267 assert!(all_exist);
268 }
269}