log_store/kafka/
client_manager.rs1use std::collections::HashMap;
16use std::sync::Arc;
17
18use common_wal::config::kafka::common::DEFAULT_BACKOFF_CONFIG;
19use common_wal::config::kafka::DatanodeKafkaConfig;
20use dashmap::DashMap;
21use rskafka::client::partition::{Compression, PartitionClient, UnknownTopicHandling};
22use rskafka::client::ClientBuilder;
23use snafu::ResultExt;
24use store_api::logstore::provider::KafkaProvider;
25use tokio::sync::{Mutex, RwLock};
26
27use crate::error::{
28 BuildClientSnafu, BuildPartitionClientSnafu, ResolveKafkaEndpointSnafu, Result, TlsConfigSnafu,
29};
30use crate::kafka::index::{GlobalIndexCollector, NoopCollector};
31use crate::kafka::producer::{OrderedBatchProducer, OrderedBatchProducerRef};
32
33pub const DEFAULT_PARTITION: i32 = 0;
36
37pub(crate) type ClientManagerRef = Arc<ClientManager>;
39
40#[derive(Debug, Clone)]
42pub(crate) struct Client {
43 client: Arc<PartitionClient>,
44 producer: OrderedBatchProducerRef,
45}
46
47impl Client {
48 pub(crate) fn client(&self) -> &Arc<PartitionClient> {
49 &self.client
50 }
51
52 pub(crate) fn producer(&self) -> &OrderedBatchProducerRef {
53 &self.producer
54 }
55}
56
57#[derive(Debug)]
59pub(crate) struct ClientManager {
60 client: rskafka::client::Client,
61 mutex: Mutex<()>,
63 instances: RwLock<HashMap<Arc<KafkaProvider>, Client>>,
64 global_index_collector: Option<GlobalIndexCollector>,
65
66 flush_batch_size: usize,
67 compression: Compression,
68
69 high_watermark: Arc<DashMap<Arc<KafkaProvider>, u64>>,
71}
72
73impl ClientManager {
74 pub(crate) async fn try_new(
76 config: &DatanodeKafkaConfig,
77 global_index_collector: Option<GlobalIndexCollector>,
78 high_watermark: Arc<DashMap<Arc<KafkaProvider>, u64>>,
79 ) -> Result<Self> {
80 let broker_endpoints = common_wal::resolve_to_ipv4(&config.connection.broker_endpoints)
82 .await
83 .context(ResolveKafkaEndpointSnafu)?;
84 let mut builder =
85 ClientBuilder::new(broker_endpoints).backoff_config(DEFAULT_BACKOFF_CONFIG);
86 if let Some(sasl) = &config.connection.sasl {
87 builder = builder.sasl_config(sasl.config.clone().into_sasl_config());
88 };
89 if let Some(tls) = &config.connection.tls {
90 builder = builder.tls_config(tls.to_tls_config().await.context(TlsConfigSnafu)?)
91 };
92
93 let client = builder.build().await.with_context(|_| BuildClientSnafu {
94 broker_endpoints: config.connection.broker_endpoints.clone(),
95 })?;
96
97 Ok(Self {
98 client,
99 mutex: Mutex::new(()),
100 instances: RwLock::new(HashMap::new()),
101 flush_batch_size: config.max_batch_bytes.as_bytes() as usize,
102 compression: Compression::Lz4,
103 global_index_collector,
104 high_watermark,
105 })
106 }
107
108 async fn try_insert(&self, provider: &Arc<KafkaProvider>) -> Result<Client> {
109 let _guard = self.mutex.lock().await;
110
111 let client = self.instances.read().await.get(provider).cloned();
112 match client {
113 Some(client) => Ok(client),
114 None => {
115 let client = self.try_create_client(provider).await?;
116 self.instances
117 .write()
118 .await
119 .insert(provider.clone(), client.clone());
120 self.high_watermark.insert(provider.clone(), 0);
121 Ok(client)
122 }
123 }
124 }
125
126 pub(crate) async fn get_or_insert(&self, provider: &Arc<KafkaProvider>) -> Result<Client> {
129 let client = self.instances.read().await.get(provider).cloned();
130 match client {
131 Some(client) => Ok(client),
132 None => self.try_insert(provider).await,
133 }
134 }
135
136 async fn try_create_client(&self, provider: &Arc<KafkaProvider>) -> Result<Client> {
137 let client = self
141 .client
142 .partition_client(
143 provider.topic.as_str(),
144 DEFAULT_PARTITION,
145 UnknownTopicHandling::Retry,
146 )
147 .await
148 .context(BuildPartitionClientSnafu {
149 topic: &provider.topic,
150 partition: DEFAULT_PARTITION,
151 })
152 .map(Arc::new)?;
153
154 let (tx, rx) = OrderedBatchProducer::channel();
155 let index_collector = if let Some(global_collector) = self.global_index_collector.as_ref() {
156 global_collector
157 .provider_level_index_collector(provider.clone(), tx.clone())
158 .await
159 } else {
160 Box::new(NoopCollector)
161 };
162 let producer = Arc::new(OrderedBatchProducer::new(
163 (tx, rx),
164 provider.clone(),
165 client.clone(),
166 self.compression,
167 self.flush_batch_size,
168 index_collector,
169 self.high_watermark.clone(),
170 ));
171
172 Ok(Client { client, producer })
173 }
174
175 pub(crate) fn global_index_collector(&self) -> Option<&GlobalIndexCollector> {
176 self.global_index_collector.as_ref()
177 }
178
179 #[cfg(test)]
180 pub(crate) fn high_watermark(&self) -> &Arc<DashMap<Arc<KafkaProvider>, u64>> {
181 &self.high_watermark
182 }
183}
184
185#[cfg(test)]
186impl ClientManager {
187 pub(crate) fn controller_client(&self) -> rskafka::client::controller::ControllerClient {
189 self.client.controller_client().unwrap()
190 }
191}
192
193#[cfg(test)]
194mod tests {
195 use common_wal::test_util::run_test_with_kafka_wal;
196 use tokio::sync::Barrier;
197
198 use super::*;
199 use crate::kafka::test_util::prepare;
200
201 #[tokio::test]
203 async fn test_sequential() {
204 run_test_with_kafka_wal(|broker_endpoints| {
205 Box::pin(async {
206 let (manager, topics) = prepare("test_sequential", 128, broker_endpoints).await;
207 let region_topic = (0..512)
209 .map(|region_id| (region_id, &topics[region_id % topics.len()]))
210 .collect::<HashMap<_, _>>();
211
212 for (_, topic) in region_topic {
214 let provider = Arc::new(KafkaProvider::new(topic.to_string()));
215 manager.get_or_insert(&provider).await.unwrap();
216 }
217
218 let client_pool = manager.instances.read().await;
220 let all_exist = topics.iter().all(|topic| {
221 let provider = Arc::new(KafkaProvider::new(topic.to_string()));
222 client_pool.contains_key(&provider)
223 });
224 assert!(all_exist);
225 })
226 })
227 .await;
228 }
229
230 #[tokio::test(flavor = "multi_thread")]
232 async fn test_parallel() {
233 run_test_with_kafka_wal(|broker_endpoints| {
234 Box::pin(async {
235 let (manager, topics) = prepare("test_parallel", 128, broker_endpoints).await;
236 let region_topic = (0..512)
238 .map(|region_id| (region_id, topics[region_id % topics.len()].clone()))
239 .collect::<HashMap<_, _>>();
240
241 let manager = Arc::new(manager);
243 let barrier = Arc::new(Barrier::new(region_topic.len()));
244 let tasks = region_topic
245 .into_values()
246 .map(|topic| {
247 let manager = manager.clone();
248 let barrier = barrier.clone();
249
250 tokio::spawn(async move {
251 barrier.wait().await;
252 let provider = Arc::new(KafkaProvider::new(topic));
253 assert!(manager.get_or_insert(&provider).await.is_ok());
254 })
255 })
256 .collect::<Vec<_>>();
257 futures::future::try_join_all(tasks).await.unwrap();
258
259 let client_pool = manager.instances.read().await;
261 let all_exist = topics.iter().all(|topic| {
262 let provider = Arc::new(KafkaProvider::new(topic.to_string()));
263 client_pool.contains_key(&provider)
264 });
265 assert!(all_exist);
266 })
267 })
268 .await;
269 }
270}