common_meta/wal_provider/
topic_creator.rs1use common_telemetry::{debug, error, info};
16use common_wal::config::kafka::common::{
17 DEFAULT_BACKOFF_CONFIG, KafkaConnectionConfig, KafkaTopicConfig,
18};
19use rskafka::client::error::Error as RsKafkaError;
20use rskafka::client::error::ProtocolError::TopicAlreadyExists;
21use rskafka::client::partition::{Compression, OffsetAt, PartitionClient, UnknownTopicHandling};
22use rskafka::client::{Client, ClientBuilder};
23use rskafka::record::Record;
24use snafu::ResultExt;
25
26use crate::error::{
27 BuildKafkaClientSnafu, BuildKafkaCtrlClientSnafu, CreateKafkaWalTopicSnafu,
28 KafkaGetOffsetSnafu, KafkaPartitionClientSnafu, ProduceRecordSnafu, Result, TlsConfigSnafu,
29};
30
31const DEFAULT_PARTITION: i32 = 0;
34
35pub struct KafkaTopicCreator {
37 client: Client,
38 num_partitions: i32,
40 replication_factor: i16,
42 create_topic_timeout: i32,
44}
45
46impl KafkaTopicCreator {
47 pub fn client(&self) -> &Client {
48 &self.client
49 }
50
51 async fn create_topic(&self, topic: &String, client: &Client) -> Result<()> {
52 let controller = client
53 .controller_client()
54 .context(BuildKafkaCtrlClientSnafu)?;
55 match controller
56 .create_topic(
57 topic,
58 self.num_partitions,
59 self.replication_factor,
60 self.create_topic_timeout,
61 )
62 .await
63 {
64 Ok(_) => {
65 info!("Successfully created topic {}", topic);
66 Ok(())
67 }
68 Err(e) => {
69 if Self::is_topic_already_exist_err(&e) {
70 info!("The topic {} already exists", topic);
71 Ok(())
72 } else {
73 error!(e; "Failed to create a topic {}", topic);
74 Err(e).context(CreateKafkaWalTopicSnafu)
75 }
76 }
77 }
78 }
79
80 async fn prepare_topic(&self, topic: &String) -> Result<()> {
81 let partition_client = self.partition_client(topic).await?;
82 self.append_noop_record(topic, &partition_client).await?;
83 Ok(())
84 }
85
86 async fn partition_client(&self, topic: &str) -> Result<PartitionClient> {
88 self.client
89 .partition_client(topic, DEFAULT_PARTITION, UnknownTopicHandling::Retry)
90 .await
91 .context(KafkaPartitionClientSnafu {
92 topic,
93 partition: DEFAULT_PARTITION,
94 })
95 }
96
97 async fn append_noop_record(
100 &self,
101 topic: &String,
102 partition_client: &PartitionClient,
103 ) -> Result<()> {
104 let end_offset = partition_client
105 .get_offset(OffsetAt::Latest)
106 .await
107 .with_context(|_| KafkaGetOffsetSnafu {
108 topic: topic.clone(),
109 partition: DEFAULT_PARTITION,
110 })?;
111 if end_offset > 0 {
112 return Ok(());
113 }
114
115 partition_client
116 .produce(
117 vec![Record {
118 key: None,
119 value: None,
120 timestamp: chrono::Utc::now(),
121 headers: Default::default(),
122 }],
123 Compression::Lz4,
124 )
125 .await
126 .context(ProduceRecordSnafu { topic })?;
127 debug!("Appended a noop record to topic {}", topic);
128
129 Ok(())
130 }
131
132 pub async fn create_topics(&self, topics: &[String]) -> Result<()> {
134 let tasks = topics
135 .iter()
136 .map(|topic| async { self.create_topic(topic, &self.client).await })
137 .collect::<Vec<_>>();
138 futures::future::try_join_all(tasks).await.map(|_| ())
139 }
140
141 pub async fn prepare_topics(&self, topics: &[String]) -> Result<()> {
145 let tasks = topics
147 .iter()
148 .map(|topic| async { self.prepare_topic(topic).await })
149 .collect::<Vec<_>>();
150 futures::future::try_join_all(tasks).await.map(|_| ())
151 }
152
153 fn is_topic_already_exist_err(e: &RsKafkaError) -> bool {
154 matches!(
155 e,
156 &RsKafkaError::ServerError {
157 protocol_error: TopicAlreadyExists,
158 ..
159 }
160 )
161 }
162}
163
164#[cfg(test)]
165impl KafkaTopicCreator {
166 pub async fn delete_topics(&self, topics: &[String]) -> Result<()> {
167 let tasks = topics
168 .iter()
169 .map(|topic| async { self.delete_topic(topic, &self.client).await })
170 .collect::<Vec<_>>();
171 futures::future::try_join_all(tasks).await.map(|_| ())
172 }
173
174 async fn delete_topic(&self, topic: &String, client: &Client) -> Result<()> {
175 let controller = client
176 .controller_client()
177 .context(BuildKafkaCtrlClientSnafu)?;
178 match controller.delete_topic(topic, 10).await {
179 Ok(_) => {
180 info!("Successfully deleted topic {}", topic);
181 Ok(())
182 }
183 Err(e) => {
184 if Self::is_unknown_topic_err(&e) {
185 info!("The topic {} does not exist", topic);
186 Ok(())
187 } else {
188 panic!("Failed to delete a topic {}, error: {}", topic, e);
189 }
190 }
191 }
192 }
193
194 fn is_unknown_topic_err(e: &RsKafkaError) -> bool {
195 matches!(
196 e,
197 &RsKafkaError::ServerError {
198 protocol_error: rskafka::client::error::ProtocolError::UnknownTopicOrPartition,
199 ..
200 }
201 )
202 }
203
204 pub async fn get_partition_client(&self, topic: &str) -> PartitionClient {
205 self.partition_client(topic).await.unwrap()
206 }
207}
208
209pub async fn build_kafka_client(connection: &KafkaConnectionConfig) -> Result<Client> {
211 let mut builder = ClientBuilder::new(connection.broker_endpoints.clone())
213 .backoff_config(DEFAULT_BACKOFF_CONFIG)
214 .connect_timeout(Some(connection.connect_timeout))
215 .timeout(Some(connection.timeout));
216 if let Some(sasl) = &connection.sasl {
217 builder = builder.sasl_config(sasl.config.clone().into_sasl_config());
218 };
219 if let Some(tls) = &connection.tls {
220 builder = builder.tls_config(tls.to_tls_config().await.context(TlsConfigSnafu)?)
221 };
222 builder
223 .build()
224 .await
225 .with_context(|_| BuildKafkaClientSnafu {
226 broker_endpoints: connection.broker_endpoints.clone(),
227 })
228}
229
230pub async fn build_kafka_topic_creator(
232 connection: &KafkaConnectionConfig,
233 kafka_topic: &KafkaTopicConfig,
234) -> Result<KafkaTopicCreator> {
235 let client = build_kafka_client(connection).await?;
236 Ok(KafkaTopicCreator {
237 client,
238 num_partitions: kafka_topic.num_partitions,
239 replication_factor: kafka_topic.replication_factor,
240 create_topic_timeout: kafka_topic.create_topic_timeout.as_millis() as i32,
241 })
242}
243
244#[cfg(test)]
245mod tests {
246 use common_wal::config::kafka::common::{KafkaConnectionConfig, KafkaTopicConfig};
247 use common_wal::maybe_skip_kafka_integration_test;
248 use common_wal::test_util::get_kafka_endpoints;
249
250 use super::*;
251
252 async fn test_topic_creator(broker_endpoints: Vec<String>) -> KafkaTopicCreator {
253 let connection = KafkaConnectionConfig {
254 broker_endpoints,
255 ..Default::default()
256 };
257 let kafka_topic = KafkaTopicConfig::default();
258
259 build_kafka_topic_creator(&connection, &kafka_topic)
260 .await
261 .unwrap()
262 }
263
264 async fn append_records(partition_client: &PartitionClient, num_records: usize) -> Result<()> {
265 for i in 0..num_records {
266 partition_client
267 .produce(
268 vec![Record {
269 key: Some(b"test".to_vec()),
270 value: Some(format!("test {}", i).as_bytes().to_vec()),
271 timestamp: chrono::Utc::now(),
272 headers: Default::default(),
273 }],
274 Compression::Lz4,
275 )
276 .await
277 .unwrap();
278 }
279 Ok(())
280 }
281
282 #[tokio::test]
283 async fn test_append_noop_record_to_empty_topic() {
284 common_telemetry::init_default_ut_logging();
285 maybe_skip_kafka_integration_test!();
286 let prefix = "append_noop_record_to_empty_topic";
287 let creator = test_topic_creator(get_kafka_endpoints()).await;
288
289 let topic = format!("{}{}", prefix, "0");
290 let topics = std::slice::from_ref(&topic);
291
292 creator.delete_topics(topics).await.unwrap();
294 creator.create_topics(topics).await.unwrap();
295
296 let partition_client = creator.partition_client(&topic).await.unwrap();
297 let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap();
298 assert_eq!(end_offset, 0);
299
300 creator
302 .append_noop_record(&topic, &partition_client)
303 .await
304 .unwrap();
305 let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap();
306 assert_eq!(end_offset, 1);
307 }
308
309 #[tokio::test]
310 async fn test_append_noop_record_to_non_empty_topic() {
311 common_telemetry::init_default_ut_logging();
312 maybe_skip_kafka_integration_test!();
313 let prefix = "append_noop_record_to_non_empty_topic";
314 let creator = test_topic_creator(get_kafka_endpoints()).await;
315
316 let topic = format!("{}{}", prefix, "0");
317 let topics = std::slice::from_ref(&topic);
318
319 creator.delete_topics(topics).await.unwrap();
321
322 creator.create_topics(topics).await.unwrap();
323 let partition_client = creator.partition_client(&topic).await.unwrap();
324 append_records(&partition_client, 2).await.unwrap();
325
326 let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap();
327 assert_eq!(end_offset, 2);
328
329 creator
331 .append_noop_record(&topic, &partition_client)
332 .await
333 .unwrap();
334 let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap();
335 assert_eq!(end_offset, 2);
336 }
337
338 #[tokio::test]
339 async fn test_create_topic() {
340 common_telemetry::init_default_ut_logging();
341 maybe_skip_kafka_integration_test!();
342 let prefix = "create_topic";
343 let creator = test_topic_creator(get_kafka_endpoints()).await;
344
345 let topic = format!("{}{}", prefix, "0");
346 let topics = std::slice::from_ref(&topic);
347
348 creator.delete_topics(topics).await.unwrap();
350
351 creator.create_topics(topics).await.unwrap();
352 creator.create_topics(topics).await.unwrap();
354
355 let partition_client = creator.partition_client(&topic).await.unwrap();
356 let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap();
357 assert_eq!(end_offset, 0);
358 }
359
360 #[tokio::test]
361 async fn test_prepare_topic() {
362 common_telemetry::init_default_ut_logging();
363 maybe_skip_kafka_integration_test!();
364 let prefix = "prepare_topic";
365 let creator = test_topic_creator(get_kafka_endpoints()).await;
366
367 let topic = format!("{}{}", prefix, "0");
368 let topics = std::slice::from_ref(&topic);
369
370 creator.delete_topics(topics).await.unwrap();
372
373 creator.create_topics(topics).await.unwrap();
374 creator.prepare_topic(&topic).await.unwrap();
375
376 let partition_client = creator.partition_client(&topic).await.unwrap();
377 let start_offset = partition_client
378 .get_offset(OffsetAt::Earliest)
379 .await
380 .unwrap();
381 assert_eq!(start_offset, 0);
382
383 let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap();
384 assert_eq!(end_offset, 1);
385 }
386
387 #[tokio::test]
388 async fn test_prepare_topic_with_stale_records_without_pruning() {
389 common_telemetry::init_default_ut_logging();
390 maybe_skip_kafka_integration_test!();
391
392 let prefix = "prepare_topic_with_stale_records_without_pruning";
393 let creator = test_topic_creator(get_kafka_endpoints()).await;
394
395 let topic = format!("{}{}", prefix, "0");
396 let topics = std::slice::from_ref(&topic);
397
398 creator.delete_topics(topics).await.unwrap();
400
401 creator.create_topics(topics).await.unwrap();
402 let partition_client = creator.partition_client(&topic).await.unwrap();
403 append_records(&partition_client, 10).await.unwrap();
404
405 creator.prepare_topic(&topic).await.unwrap();
406
407 let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap();
408 assert_eq!(end_offset, 10);
409 let start_offset = partition_client
410 .get_offset(OffsetAt::Earliest)
411 .await
412 .unwrap();
413 assert_eq!(start_offset, 0);
414 }
415}