common_meta/wal_options_allocator/
topic_creator.rs1use common_telemetry::{debug, error, info};
16use common_wal::config::kafka::common::{
17 DEFAULT_BACKOFF_CONFIG, KafkaConnectionConfig, KafkaTopicConfig,
18};
19use rskafka::client::error::Error as RsKafkaError;
20use rskafka::client::error::ProtocolError::TopicAlreadyExists;
21use rskafka::client::partition::{Compression, OffsetAt, PartitionClient, UnknownTopicHandling};
22use rskafka::client::{Client, ClientBuilder};
23use rskafka::record::Record;
24use snafu::ResultExt;
25
26use crate::error::{
27 BuildKafkaClientSnafu, BuildKafkaCtrlClientSnafu, CreateKafkaWalTopicSnafu,
28 KafkaGetOffsetSnafu, KafkaPartitionClientSnafu, ProduceRecordSnafu, Result, TlsConfigSnafu,
29};
30
31const DEFAULT_PARTITION: i32 = 0;
34
35pub struct KafkaTopicCreator {
37 client: Client,
38 num_partitions: i32,
40 replication_factor: i16,
42 create_topic_timeout: i32,
44}
45
46impl KafkaTopicCreator {
47 pub fn client(&self) -> &Client {
48 &self.client
49 }
50
51 async fn create_topic(&self, topic: &String, client: &Client) -> Result<()> {
52 let controller = client
53 .controller_client()
54 .context(BuildKafkaCtrlClientSnafu)?;
55 match controller
56 .create_topic(
57 topic,
58 self.num_partitions,
59 self.replication_factor,
60 self.create_topic_timeout,
61 )
62 .await
63 {
64 Ok(_) => {
65 info!("Successfully created topic {}", topic);
66 Ok(())
67 }
68 Err(e) => {
69 if Self::is_topic_already_exist_err(&e) {
70 info!("The topic {} already exists", topic);
71 Ok(())
72 } else {
73 error!(e; "Failed to create a topic {}", topic);
74 Err(e).context(CreateKafkaWalTopicSnafu)
75 }
76 }
77 }
78 }
79
80 async fn prepare_topic(&self, topic: &String) -> Result<()> {
81 let partition_client = self.partition_client(topic).await?;
82 self.append_noop_record(topic, &partition_client).await?;
83 Ok(())
84 }
85
86 async fn partition_client(&self, topic: &str) -> Result<PartitionClient> {
88 self.client
89 .partition_client(topic, DEFAULT_PARTITION, UnknownTopicHandling::Retry)
90 .await
91 .context(KafkaPartitionClientSnafu {
92 topic,
93 partition: DEFAULT_PARTITION,
94 })
95 }
96
97 async fn append_noop_record(
100 &self,
101 topic: &String,
102 partition_client: &PartitionClient,
103 ) -> Result<()> {
104 let end_offset = partition_client
105 .get_offset(OffsetAt::Latest)
106 .await
107 .context(KafkaGetOffsetSnafu {
108 topic: topic.to_string(),
109 partition: DEFAULT_PARTITION,
110 })?;
111 if end_offset > 0 {
112 return Ok(());
113 }
114
115 partition_client
116 .produce(
117 vec![Record {
118 key: None,
119 value: None,
120 timestamp: chrono::Utc::now(),
121 headers: Default::default(),
122 }],
123 Compression::Lz4,
124 )
125 .await
126 .context(ProduceRecordSnafu { topic })?;
127 debug!("Appended a noop record to topic {}", topic);
128
129 Ok(())
130 }
131
132 pub async fn create_topics(&self, topics: &[String]) -> Result<()> {
134 let tasks = topics
135 .iter()
136 .map(|topic| async { self.create_topic(topic, &self.client).await })
137 .collect::<Vec<_>>();
138 futures::future::try_join_all(tasks).await.map(|_| ())
139 }
140
141 pub async fn prepare_topics(&self, topics: &[String]) -> Result<()> {
145 let tasks = topics
147 .iter()
148 .map(|topic| async { self.prepare_topic(topic).await })
149 .collect::<Vec<_>>();
150 futures::future::try_join_all(tasks).await.map(|_| ())
151 }
152
153 fn is_topic_already_exist_err(e: &RsKafkaError) -> bool {
154 matches!(
155 e,
156 &RsKafkaError::ServerError {
157 protocol_error: TopicAlreadyExists,
158 ..
159 }
160 )
161 }
162}
163
164#[cfg(test)]
165impl KafkaTopicCreator {
166 pub async fn delete_topics(&self, topics: &[String]) -> Result<()> {
167 let tasks = topics
168 .iter()
169 .map(|topic| async { self.delete_topic(topic, &self.client).await })
170 .collect::<Vec<_>>();
171 futures::future::try_join_all(tasks).await.map(|_| ())
172 }
173
174 async fn delete_topic(&self, topic: &String, client: &Client) -> Result<()> {
175 let controller = client
176 .controller_client()
177 .context(BuildKafkaCtrlClientSnafu)?;
178 match controller.delete_topic(topic, 10).await {
179 Ok(_) => {
180 info!("Successfully deleted topic {}", topic);
181 Ok(())
182 }
183 Err(e) => {
184 if Self::is_unknown_topic_err(&e) {
185 info!("The topic {} does not exist", topic);
186 Ok(())
187 } else {
188 panic!("Failed to delete a topic {}, error: {}", topic, e);
189 }
190 }
191 }
192 }
193
194 fn is_unknown_topic_err(e: &RsKafkaError) -> bool {
195 matches!(
196 e,
197 &RsKafkaError::ServerError {
198 protocol_error: rskafka::client::error::ProtocolError::UnknownTopicOrPartition,
199 ..
200 }
201 )
202 }
203
204 pub async fn get_partition_client(&self, topic: &str) -> PartitionClient {
205 self.partition_client(topic).await.unwrap()
206 }
207}
208pub async fn build_kafka_client(connection: &KafkaConnectionConfig) -> Result<Client> {
210 let mut builder = ClientBuilder::new(connection.broker_endpoints.clone())
212 .backoff_config(DEFAULT_BACKOFF_CONFIG);
213 if let Some(sasl) = &connection.sasl {
214 builder = builder.sasl_config(sasl.config.clone().into_sasl_config());
215 };
216 if let Some(tls) = &connection.tls {
217 builder = builder.tls_config(tls.to_tls_config().await.context(TlsConfigSnafu)?)
218 };
219 builder
220 .build()
221 .await
222 .with_context(|_| BuildKafkaClientSnafu {
223 broker_endpoints: connection.broker_endpoints.clone(),
224 })
225}
226
227pub async fn build_kafka_topic_creator(
229 connection: &KafkaConnectionConfig,
230 kafka_topic: &KafkaTopicConfig,
231) -> Result<KafkaTopicCreator> {
232 let client = build_kafka_client(connection).await?;
233 Ok(KafkaTopicCreator {
234 client,
235 num_partitions: kafka_topic.num_partitions,
236 replication_factor: kafka_topic.replication_factor,
237 create_topic_timeout: kafka_topic.create_topic_timeout.as_millis() as i32,
238 })
239}
240
241#[cfg(test)]
242mod tests {
243 use common_wal::config::kafka::common::{KafkaConnectionConfig, KafkaTopicConfig};
244 use common_wal::maybe_skip_kafka_integration_test;
245 use common_wal::test_util::get_kafka_endpoints;
246
247 use super::*;
248
249 async fn test_topic_creator(broker_endpoints: Vec<String>) -> KafkaTopicCreator {
250 let connection = KafkaConnectionConfig {
251 broker_endpoints,
252 ..Default::default()
253 };
254 let kafka_topic = KafkaTopicConfig::default();
255
256 build_kafka_topic_creator(&connection, &kafka_topic)
257 .await
258 .unwrap()
259 }
260
261 async fn append_records(partition_client: &PartitionClient, num_records: usize) -> Result<()> {
262 for i in 0..num_records {
263 partition_client
264 .produce(
265 vec![Record {
266 key: Some(b"test".to_vec()),
267 value: Some(format!("test {}", i).as_bytes().to_vec()),
268 timestamp: chrono::Utc::now(),
269 headers: Default::default(),
270 }],
271 Compression::Lz4,
272 )
273 .await
274 .unwrap();
275 }
276 Ok(())
277 }
278
279 #[tokio::test]
280 async fn test_append_noop_record_to_empty_topic() {
281 common_telemetry::init_default_ut_logging();
282 maybe_skip_kafka_integration_test!();
283 let prefix = "append_noop_record_to_empty_topic";
284 let creator = test_topic_creator(get_kafka_endpoints()).await;
285
286 let topic = format!("{}{}", prefix, "0");
287 creator.delete_topics(&[topic.to_string()]).await.unwrap();
289 creator.create_topics(&[topic.to_string()]).await.unwrap();
290
291 let partition_client = creator.partition_client(&topic).await.unwrap();
292 let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap();
293 assert_eq!(end_offset, 0);
294
295 creator
297 .append_noop_record(&topic, &partition_client)
298 .await
299 .unwrap();
300 let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap();
301 assert_eq!(end_offset, 1);
302 }
303
304 #[tokio::test]
305 async fn test_append_noop_record_to_non_empty_topic() {
306 common_telemetry::init_default_ut_logging();
307 maybe_skip_kafka_integration_test!();
308 let prefix = "append_noop_record_to_non_empty_topic";
309 let creator = test_topic_creator(get_kafka_endpoints()).await;
310
311 let topic = format!("{}{}", prefix, "0");
312 creator.delete_topics(&[topic.to_string()]).await.unwrap();
314
315 creator.create_topics(&[topic.to_string()]).await.unwrap();
316 let partition_client = creator.partition_client(&topic).await.unwrap();
317 append_records(&partition_client, 2).await.unwrap();
318
319 let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap();
320 assert_eq!(end_offset, 2);
321
322 creator
324 .append_noop_record(&topic, &partition_client)
325 .await
326 .unwrap();
327 let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap();
328 assert_eq!(end_offset, 2);
329 }
330
331 #[tokio::test]
332 async fn test_create_topic() {
333 common_telemetry::init_default_ut_logging();
334 maybe_skip_kafka_integration_test!();
335 let prefix = "create_topic";
336 let creator = test_topic_creator(get_kafka_endpoints()).await;
337
338 let topic = format!("{}{}", prefix, "0");
339 creator.delete_topics(&[topic.to_string()]).await.unwrap();
341
342 creator.create_topics(&[topic.to_string()]).await.unwrap();
343 creator.create_topics(&[topic.to_string()]).await.unwrap();
345
346 let partition_client = creator.partition_client(&topic).await.unwrap();
347 let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap();
348 assert_eq!(end_offset, 0);
349 }
350
351 #[tokio::test]
352 async fn test_prepare_topic() {
353 common_telemetry::init_default_ut_logging();
354 maybe_skip_kafka_integration_test!();
355 let prefix = "prepare_topic";
356 let creator = test_topic_creator(get_kafka_endpoints()).await;
357
358 let topic = format!("{}{}", prefix, "0");
359 creator.delete_topics(&[topic.to_string()]).await.unwrap();
361
362 creator.create_topics(&[topic.to_string()]).await.unwrap();
363 creator.prepare_topic(&topic).await.unwrap();
364
365 let partition_client = creator.partition_client(&topic).await.unwrap();
366 let start_offset = partition_client
367 .get_offset(OffsetAt::Earliest)
368 .await
369 .unwrap();
370 assert_eq!(start_offset, 0);
371
372 let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap();
373 assert_eq!(end_offset, 1);
374 }
375
376 #[tokio::test]
377 async fn test_prepare_topic_with_stale_records_without_pruning() {
378 common_telemetry::init_default_ut_logging();
379 maybe_skip_kafka_integration_test!();
380
381 let prefix = "prepare_topic_with_stale_records_without_pruning";
382 let creator = test_topic_creator(get_kafka_endpoints()).await;
383
384 let topic = format!("{}{}", prefix, "0");
385 creator.delete_topics(&[topic.to_string()]).await.unwrap();
387
388 creator.create_topics(&[topic.to_string()]).await.unwrap();
389 let partition_client = creator.partition_client(&topic).await.unwrap();
390 append_records(&partition_client, 10).await.unwrap();
391
392 creator.prepare_topic(&topic).await.unwrap();
393
394 let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap();
395 assert_eq!(end_offset, 10);
396 let start_offset = partition_client
397 .get_offset(OffsetAt::Earliest)
398 .await
399 .unwrap();
400 assert_eq!(start_offset, 0);
401 }
402}