common_meta/wal_options_allocator/
topic_creator.rs1use common_telemetry::{debug, error, info};
16use common_wal::config::kafka::common::{
17 KafkaConnectionConfig, KafkaTopicConfig, DEFAULT_BACKOFF_CONFIG,
18};
19use rskafka::client::error::Error as RsKafkaError;
20use rskafka::client::error::ProtocolError::TopicAlreadyExists;
21use rskafka::client::partition::{Compression, OffsetAt, PartitionClient, UnknownTopicHandling};
22use rskafka::client::{Client, ClientBuilder};
23use rskafka::record::Record;
24use snafu::ResultExt;
25
26use crate::error::{
27 BuildKafkaClientSnafu, BuildKafkaCtrlClientSnafu, CreateKafkaWalTopicSnafu,
28 KafkaGetOffsetSnafu, KafkaPartitionClientSnafu, ProduceRecordSnafu, ResolveKafkaEndpointSnafu,
29 Result, TlsConfigSnafu,
30};
31
32const DEFAULT_PARTITION: i32 = 0;
35
36pub struct KafkaTopicCreator {
38 client: Client,
39 num_partitions: i32,
41 replication_factor: i16,
43 create_topic_timeout: i32,
45}
46
47impl KafkaTopicCreator {
48 pub fn client(&self) -> &Client {
49 &self.client
50 }
51
52 async fn create_topic(&self, topic: &String, client: &Client) -> Result<()> {
53 let controller = client
54 .controller_client()
55 .context(BuildKafkaCtrlClientSnafu)?;
56 match controller
57 .create_topic(
58 topic,
59 self.num_partitions,
60 self.replication_factor,
61 self.create_topic_timeout,
62 )
63 .await
64 {
65 Ok(_) => {
66 info!("Successfully created topic {}", topic);
67 Ok(())
68 }
69 Err(e) => {
70 if Self::is_topic_already_exist_err(&e) {
71 info!("The topic {} already exists", topic);
72 Ok(())
73 } else {
74 error!(e; "Failed to create a topic {}", topic);
75 Err(e).context(CreateKafkaWalTopicSnafu)
76 }
77 }
78 }
79 }
80
81 async fn prepare_topic(&self, topic: &String) -> Result<()> {
82 let partition_client = self.partition_client(topic).await?;
83 self.append_noop_record(topic, &partition_client).await?;
84 Ok(())
85 }
86
87 async fn partition_client(&self, topic: &str) -> Result<PartitionClient> {
89 self.client
90 .partition_client(topic, DEFAULT_PARTITION, UnknownTopicHandling::Retry)
91 .await
92 .context(KafkaPartitionClientSnafu {
93 topic,
94 partition: DEFAULT_PARTITION,
95 })
96 }
97
98 async fn append_noop_record(
101 &self,
102 topic: &String,
103 partition_client: &PartitionClient,
104 ) -> Result<()> {
105 let end_offset = partition_client
106 .get_offset(OffsetAt::Latest)
107 .await
108 .context(KafkaGetOffsetSnafu {
109 topic: topic.to_string(),
110 partition: DEFAULT_PARTITION,
111 })?;
112 if end_offset > 0 {
113 return Ok(());
114 }
115
116 partition_client
117 .produce(
118 vec![Record {
119 key: None,
120 value: None,
121 timestamp: chrono::Utc::now(),
122 headers: Default::default(),
123 }],
124 Compression::Lz4,
125 )
126 .await
127 .context(ProduceRecordSnafu { topic })?;
128 debug!("Appended a noop record to topic {}", topic);
129
130 Ok(())
131 }
132
133 pub async fn create_topics(&self, topics: &[String]) -> Result<()> {
135 let tasks = topics
136 .iter()
137 .map(|topic| async { self.create_topic(topic, &self.client).await })
138 .collect::<Vec<_>>();
139 futures::future::try_join_all(tasks).await.map(|_| ())
140 }
141
142 pub async fn prepare_topics(&self, topics: &[String]) -> Result<()> {
146 let tasks = topics
148 .iter()
149 .map(|topic| async { self.prepare_topic(topic).await })
150 .collect::<Vec<_>>();
151 futures::future::try_join_all(tasks).await.map(|_| ())
152 }
153
154 fn is_topic_already_exist_err(e: &RsKafkaError) -> bool {
155 matches!(
156 e,
157 &RsKafkaError::ServerError {
158 protocol_error: TopicAlreadyExists,
159 ..
160 }
161 )
162 }
163}
164
165#[cfg(test)]
166impl KafkaTopicCreator {
167 pub async fn delete_topics(&self, topics: &[String]) -> Result<()> {
168 let tasks = topics
169 .iter()
170 .map(|topic| async { self.delete_topic(topic, &self.client).await })
171 .collect::<Vec<_>>();
172 futures::future::try_join_all(tasks).await.map(|_| ())
173 }
174
175 async fn delete_topic(&self, topic: &String, client: &Client) -> Result<()> {
176 let controller = client
177 .controller_client()
178 .context(BuildKafkaCtrlClientSnafu)?;
179 match controller.delete_topic(topic, 10).await {
180 Ok(_) => {
181 info!("Successfully deleted topic {}", topic);
182 Ok(())
183 }
184 Err(e) => {
185 if Self::is_unknown_topic_err(&e) {
186 info!("The topic {} does not exist", topic);
187 Ok(())
188 } else {
189 panic!("Failed to delete a topic {}, error: {}", topic, e);
190 }
191 }
192 }
193 }
194
195 fn is_unknown_topic_err(e: &RsKafkaError) -> bool {
196 matches!(
197 e,
198 &RsKafkaError::ServerError {
199 protocol_error: rskafka::client::error::ProtocolError::UnknownTopicOrPartition,
200 ..
201 }
202 )
203 }
204
205 pub async fn get_partition_client(&self, topic: &str) -> PartitionClient {
206 self.partition_client(topic).await.unwrap()
207 }
208}
209pub async fn build_kafka_client(connection: &KafkaConnectionConfig) -> Result<Client> {
211 let broker_endpoints = common_wal::resolve_to_ipv4(&connection.broker_endpoints)
213 .await
214 .context(ResolveKafkaEndpointSnafu)?;
215 let mut builder = ClientBuilder::new(broker_endpoints).backoff_config(DEFAULT_BACKOFF_CONFIG);
216 if let Some(sasl) = &connection.sasl {
217 builder = builder.sasl_config(sasl.config.clone().into_sasl_config());
218 };
219 if let Some(tls) = &connection.tls {
220 builder = builder.tls_config(tls.to_tls_config().await.context(TlsConfigSnafu)?)
221 };
222 builder
223 .build()
224 .await
225 .with_context(|_| BuildKafkaClientSnafu {
226 broker_endpoints: connection.broker_endpoints.clone(),
227 })
228}
229
230pub async fn build_kafka_topic_creator(
232 connection: &KafkaConnectionConfig,
233 kafka_topic: &KafkaTopicConfig,
234) -> Result<KafkaTopicCreator> {
235 let client = build_kafka_client(connection).await?;
236 Ok(KafkaTopicCreator {
237 client,
238 num_partitions: kafka_topic.num_partitions,
239 replication_factor: kafka_topic.replication_factor,
240 create_topic_timeout: kafka_topic.create_topic_timeout.as_millis() as i32,
241 })
242}
243
244#[cfg(test)]
245mod tests {
246 use common_wal::config::kafka::common::{KafkaConnectionConfig, KafkaTopicConfig};
247 use common_wal::maybe_skip_kafka_integration_test;
248 use common_wal::test_util::get_kafka_endpoints;
249
250 use super::*;
251
252 async fn test_topic_creator(broker_endpoints: Vec<String>) -> KafkaTopicCreator {
253 let connection = KafkaConnectionConfig {
254 broker_endpoints,
255 ..Default::default()
256 };
257 let kafka_topic = KafkaTopicConfig::default();
258
259 build_kafka_topic_creator(&connection, &kafka_topic)
260 .await
261 .unwrap()
262 }
263
264 async fn append_records(partition_client: &PartitionClient, num_records: usize) -> Result<()> {
265 for i in 0..num_records {
266 partition_client
267 .produce(
268 vec![Record {
269 key: Some(b"test".to_vec()),
270 value: Some(format!("test {}", i).as_bytes().to_vec()),
271 timestamp: chrono::Utc::now(),
272 headers: Default::default(),
273 }],
274 Compression::Lz4,
275 )
276 .await
277 .unwrap();
278 }
279 Ok(())
280 }
281
282 #[tokio::test]
283 async fn test_append_noop_record_to_empty_topic() {
284 common_telemetry::init_default_ut_logging();
285 maybe_skip_kafka_integration_test!();
286 let prefix = "append_noop_record_to_empty_topic";
287 let creator = test_topic_creator(get_kafka_endpoints()).await;
288
289 let topic = format!("{}{}", prefix, "0");
290 creator.delete_topics(&[topic.to_string()]).await.unwrap();
292 creator.create_topics(&[topic.to_string()]).await.unwrap();
293
294 let partition_client = creator.partition_client(&topic).await.unwrap();
295 let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap();
296 assert_eq!(end_offset, 0);
297
298 creator
300 .append_noop_record(&topic, &partition_client)
301 .await
302 .unwrap();
303 let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap();
304 assert_eq!(end_offset, 1);
305 }
306
307 #[tokio::test]
308 async fn test_append_noop_record_to_non_empty_topic() {
309 common_telemetry::init_default_ut_logging();
310 maybe_skip_kafka_integration_test!();
311 let prefix = "append_noop_record_to_non_empty_topic";
312 let creator = test_topic_creator(get_kafka_endpoints()).await;
313
314 let topic = format!("{}{}", prefix, "0");
315 creator.delete_topics(&[topic.to_string()]).await.unwrap();
317
318 creator.create_topics(&[topic.to_string()]).await.unwrap();
319 let partition_client = creator.partition_client(&topic).await.unwrap();
320 append_records(&partition_client, 2).await.unwrap();
321
322 let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap();
323 assert_eq!(end_offset, 2);
324
325 creator
327 .append_noop_record(&topic, &partition_client)
328 .await
329 .unwrap();
330 let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap();
331 assert_eq!(end_offset, 2);
332 }
333
334 #[tokio::test]
335 async fn test_create_topic() {
336 common_telemetry::init_default_ut_logging();
337 maybe_skip_kafka_integration_test!();
338 let prefix = "create_topic";
339 let creator = test_topic_creator(get_kafka_endpoints()).await;
340
341 let topic = format!("{}{}", prefix, "0");
342 creator.delete_topics(&[topic.to_string()]).await.unwrap();
344
345 creator.create_topics(&[topic.to_string()]).await.unwrap();
346 creator.create_topics(&[topic.to_string()]).await.unwrap();
348
349 let partition_client = creator.partition_client(&topic).await.unwrap();
350 let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap();
351 assert_eq!(end_offset, 0);
352 }
353
354 #[tokio::test]
355 async fn test_prepare_topic() {
356 common_telemetry::init_default_ut_logging();
357 maybe_skip_kafka_integration_test!();
358 let prefix = "prepare_topic";
359 let creator = test_topic_creator(get_kafka_endpoints()).await;
360
361 let topic = format!("{}{}", prefix, "0");
362 creator.delete_topics(&[topic.to_string()]).await.unwrap();
364
365 creator.create_topics(&[topic.to_string()]).await.unwrap();
366 creator.prepare_topic(&topic).await.unwrap();
367
368 let partition_client = creator.partition_client(&topic).await.unwrap();
369 let start_offset = partition_client
370 .get_offset(OffsetAt::Earliest)
371 .await
372 .unwrap();
373 assert_eq!(start_offset, 0);
374
375 let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap();
376 assert_eq!(end_offset, 1);
377 }
378
379 #[tokio::test]
380 async fn test_prepare_topic_with_stale_records_without_pruning() {
381 common_telemetry::init_default_ut_logging();
382 maybe_skip_kafka_integration_test!();
383
384 let prefix = "prepare_topic_with_stale_records_without_pruning";
385 let creator = test_topic_creator(get_kafka_endpoints()).await;
386
387 let topic = format!("{}{}", prefix, "0");
388 creator.delete_topics(&[topic.to_string()]).await.unwrap();
390
391 creator.create_topics(&[topic.to_string()]).await.unwrap();
392 let partition_client = creator.partition_client(&topic).await.unwrap();
393 append_records(&partition_client, 10).await.unwrap();
394
395 creator.prepare_topic(&topic).await.unwrap();
396
397 let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap();
398 assert_eq!(end_offset, 10);
399 let start_offset = partition_client
400 .get_offset(OffsetAt::Earliest)
401 .await
402 .unwrap();
403 assert_eq!(start_offset, 0);
404 }
405}