common_meta/wal_options_allocator/
topic_creator.rs1use common_telemetry::{debug, error, info};
16use common_wal::config::kafka::common::{
17 DEFAULT_BACKOFF_CONFIG, KafkaConnectionConfig, KafkaTopicConfig,
18};
19use rskafka::client::error::Error as RsKafkaError;
20use rskafka::client::error::ProtocolError::TopicAlreadyExists;
21use rskafka::client::partition::{Compression, OffsetAt, PartitionClient, UnknownTopicHandling};
22use rskafka::client::{Client, ClientBuilder};
23use rskafka::record::Record;
24use snafu::ResultExt;
25
26use crate::error::{
27 BuildKafkaClientSnafu, BuildKafkaCtrlClientSnafu, CreateKafkaWalTopicSnafu,
28 KafkaGetOffsetSnafu, KafkaPartitionClientSnafu, ProduceRecordSnafu, Result, TlsConfigSnafu,
29};
30
31const DEFAULT_PARTITION: i32 = 0;
34
35pub struct KafkaTopicCreator {
37 client: Client,
38 num_partitions: i32,
40 replication_factor: i16,
42 create_topic_timeout: i32,
44}
45
46impl KafkaTopicCreator {
47 pub fn client(&self) -> &Client {
48 &self.client
49 }
50
51 async fn create_topic(&self, topic: &String, client: &Client) -> Result<()> {
52 let controller = client
53 .controller_client()
54 .context(BuildKafkaCtrlClientSnafu)?;
55 match controller
56 .create_topic(
57 topic,
58 self.num_partitions,
59 self.replication_factor,
60 self.create_topic_timeout,
61 )
62 .await
63 {
64 Ok(_) => {
65 info!("Successfully created topic {}", topic);
66 Ok(())
67 }
68 Err(e) => {
69 if Self::is_topic_already_exist_err(&e) {
70 info!("The topic {} already exists", topic);
71 Ok(())
72 } else {
73 error!(e; "Failed to create a topic {}", topic);
74 Err(e).context(CreateKafkaWalTopicSnafu)
75 }
76 }
77 }
78 }
79
80 async fn prepare_topic(&self, topic: &String) -> Result<()> {
81 let partition_client = self.partition_client(topic).await?;
82 self.append_noop_record(topic, &partition_client).await?;
83 Ok(())
84 }
85
86 async fn partition_client(&self, topic: &str) -> Result<PartitionClient> {
88 self.client
89 .partition_client(topic, DEFAULT_PARTITION, UnknownTopicHandling::Retry)
90 .await
91 .context(KafkaPartitionClientSnafu {
92 topic,
93 partition: DEFAULT_PARTITION,
94 })
95 }
96
97 async fn append_noop_record(
100 &self,
101 topic: &String,
102 partition_client: &PartitionClient,
103 ) -> Result<()> {
104 let end_offset = partition_client
105 .get_offset(OffsetAt::Latest)
106 .await
107 .with_context(|_| KafkaGetOffsetSnafu {
108 topic: topic.clone(),
109 partition: DEFAULT_PARTITION,
110 })?;
111 if end_offset > 0 {
112 return Ok(());
113 }
114
115 partition_client
116 .produce(
117 vec![Record {
118 key: None,
119 value: None,
120 timestamp: chrono::Utc::now(),
121 headers: Default::default(),
122 }],
123 Compression::Lz4,
124 )
125 .await
126 .context(ProduceRecordSnafu { topic })?;
127 debug!("Appended a noop record to topic {}", topic);
128
129 Ok(())
130 }
131
132 pub async fn create_topics(&self, topics: &[String]) -> Result<()> {
134 let tasks = topics
135 .iter()
136 .map(|topic| async { self.create_topic(topic, &self.client).await })
137 .collect::<Vec<_>>();
138 futures::future::try_join_all(tasks).await.map(|_| ())
139 }
140
141 pub async fn prepare_topics(&self, topics: &[String]) -> Result<()> {
145 let tasks = topics
147 .iter()
148 .map(|topic| async { self.prepare_topic(topic).await })
149 .collect::<Vec<_>>();
150 futures::future::try_join_all(tasks).await.map(|_| ())
151 }
152
153 fn is_topic_already_exist_err(e: &RsKafkaError) -> bool {
154 matches!(
155 e,
156 &RsKafkaError::ServerError {
157 protocol_error: TopicAlreadyExists,
158 ..
159 }
160 )
161 }
162}
163
164#[cfg(test)]
165impl KafkaTopicCreator {
166 pub async fn delete_topics(&self, topics: &[String]) -> Result<()> {
167 let tasks = topics
168 .iter()
169 .map(|topic| async { self.delete_topic(topic, &self.client).await })
170 .collect::<Vec<_>>();
171 futures::future::try_join_all(tasks).await.map(|_| ())
172 }
173
174 async fn delete_topic(&self, topic: &String, client: &Client) -> Result<()> {
175 let controller = client
176 .controller_client()
177 .context(BuildKafkaCtrlClientSnafu)?;
178 match controller.delete_topic(topic, 10).await {
179 Ok(_) => {
180 info!("Successfully deleted topic {}", topic);
181 Ok(())
182 }
183 Err(e) => {
184 if Self::is_unknown_topic_err(&e) {
185 info!("The topic {} does not exist", topic);
186 Ok(())
187 } else {
188 panic!("Failed to delete a topic {}, error: {}", topic, e);
189 }
190 }
191 }
192 }
193
194 fn is_unknown_topic_err(e: &RsKafkaError) -> bool {
195 matches!(
196 e,
197 &RsKafkaError::ServerError {
198 protocol_error: rskafka::client::error::ProtocolError::UnknownTopicOrPartition,
199 ..
200 }
201 )
202 }
203
204 pub async fn get_partition_client(&self, topic: &str) -> PartitionClient {
205 self.partition_client(topic).await.unwrap()
206 }
207}
208pub async fn build_kafka_client(connection: &KafkaConnectionConfig) -> Result<Client> {
210 let mut builder = ClientBuilder::new(connection.broker_endpoints.clone())
212 .backoff_config(DEFAULT_BACKOFF_CONFIG);
213 if let Some(sasl) = &connection.sasl {
214 builder = builder.sasl_config(sasl.config.clone().into_sasl_config());
215 };
216 if let Some(tls) = &connection.tls {
217 builder = builder.tls_config(tls.to_tls_config().await.context(TlsConfigSnafu)?)
218 };
219 builder
220 .build()
221 .await
222 .with_context(|_| BuildKafkaClientSnafu {
223 broker_endpoints: connection.broker_endpoints.clone(),
224 })
225}
226
227pub async fn build_kafka_topic_creator(
229 connection: &KafkaConnectionConfig,
230 kafka_topic: &KafkaTopicConfig,
231) -> Result<KafkaTopicCreator> {
232 let client = build_kafka_client(connection).await?;
233 Ok(KafkaTopicCreator {
234 client,
235 num_partitions: kafka_topic.num_partitions,
236 replication_factor: kafka_topic.replication_factor,
237 create_topic_timeout: kafka_topic.create_topic_timeout.as_millis() as i32,
238 })
239}
240
241#[cfg(test)]
242mod tests {
243 use common_wal::config::kafka::common::{KafkaConnectionConfig, KafkaTopicConfig};
244 use common_wal::maybe_skip_kafka_integration_test;
245 use common_wal::test_util::get_kafka_endpoints;
246
247 use super::*;
248
249 async fn test_topic_creator(broker_endpoints: Vec<String>) -> KafkaTopicCreator {
250 let connection = KafkaConnectionConfig {
251 broker_endpoints,
252 ..Default::default()
253 };
254 let kafka_topic = KafkaTopicConfig::default();
255
256 build_kafka_topic_creator(&connection, &kafka_topic)
257 .await
258 .unwrap()
259 }
260
261 async fn append_records(partition_client: &PartitionClient, num_records: usize) -> Result<()> {
262 for i in 0..num_records {
263 partition_client
264 .produce(
265 vec![Record {
266 key: Some(b"test".to_vec()),
267 value: Some(format!("test {}", i).as_bytes().to_vec()),
268 timestamp: chrono::Utc::now(),
269 headers: Default::default(),
270 }],
271 Compression::Lz4,
272 )
273 .await
274 .unwrap();
275 }
276 Ok(())
277 }
278
279 #[tokio::test]
280 async fn test_append_noop_record_to_empty_topic() {
281 common_telemetry::init_default_ut_logging();
282 maybe_skip_kafka_integration_test!();
283 let prefix = "append_noop_record_to_empty_topic";
284 let creator = test_topic_creator(get_kafka_endpoints()).await;
285
286 let topic = format!("{}{}", prefix, "0");
287 let topics = std::slice::from_ref(&topic);
288
289 creator.delete_topics(topics).await.unwrap();
291 creator.create_topics(topics).await.unwrap();
292
293 let partition_client = creator.partition_client(&topic).await.unwrap();
294 let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap();
295 assert_eq!(end_offset, 0);
296
297 creator
299 .append_noop_record(&topic, &partition_client)
300 .await
301 .unwrap();
302 let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap();
303 assert_eq!(end_offset, 1);
304 }
305
306 #[tokio::test]
307 async fn test_append_noop_record_to_non_empty_topic() {
308 common_telemetry::init_default_ut_logging();
309 maybe_skip_kafka_integration_test!();
310 let prefix = "append_noop_record_to_non_empty_topic";
311 let creator = test_topic_creator(get_kafka_endpoints()).await;
312
313 let topic = format!("{}{}", prefix, "0");
314 let topics = std::slice::from_ref(&topic);
315
316 creator.delete_topics(topics).await.unwrap();
318
319 creator.create_topics(topics).await.unwrap();
320 let partition_client = creator.partition_client(&topic).await.unwrap();
321 append_records(&partition_client, 2).await.unwrap();
322
323 let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap();
324 assert_eq!(end_offset, 2);
325
326 creator
328 .append_noop_record(&topic, &partition_client)
329 .await
330 .unwrap();
331 let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap();
332 assert_eq!(end_offset, 2);
333 }
334
335 #[tokio::test]
336 async fn test_create_topic() {
337 common_telemetry::init_default_ut_logging();
338 maybe_skip_kafka_integration_test!();
339 let prefix = "create_topic";
340 let creator = test_topic_creator(get_kafka_endpoints()).await;
341
342 let topic = format!("{}{}", prefix, "0");
343 let topics = std::slice::from_ref(&topic);
344
345 creator.delete_topics(topics).await.unwrap();
347
348 creator.create_topics(topics).await.unwrap();
349 creator.create_topics(topics).await.unwrap();
351
352 let partition_client = creator.partition_client(&topic).await.unwrap();
353 let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap();
354 assert_eq!(end_offset, 0);
355 }
356
357 #[tokio::test]
358 async fn test_prepare_topic() {
359 common_telemetry::init_default_ut_logging();
360 maybe_skip_kafka_integration_test!();
361 let prefix = "prepare_topic";
362 let creator = test_topic_creator(get_kafka_endpoints()).await;
363
364 let topic = format!("{}{}", prefix, "0");
365 let topics = std::slice::from_ref(&topic);
366
367 creator.delete_topics(topics).await.unwrap();
369
370 creator.create_topics(topics).await.unwrap();
371 creator.prepare_topic(&topic).await.unwrap();
372
373 let partition_client = creator.partition_client(&topic).await.unwrap();
374 let start_offset = partition_client
375 .get_offset(OffsetAt::Earliest)
376 .await
377 .unwrap();
378 assert_eq!(start_offset, 0);
379
380 let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap();
381 assert_eq!(end_offset, 1);
382 }
383
384 #[tokio::test]
385 async fn test_prepare_topic_with_stale_records_without_pruning() {
386 common_telemetry::init_default_ut_logging();
387 maybe_skip_kafka_integration_test!();
388
389 let prefix = "prepare_topic_with_stale_records_without_pruning";
390 let creator = test_topic_creator(get_kafka_endpoints()).await;
391
392 let topic = format!("{}{}", prefix, "0");
393 let topics = std::slice::from_ref(&topic);
394
395 creator.delete_topics(topics).await.unwrap();
397
398 creator.create_topics(topics).await.unwrap();
399 let partition_client = creator.partition_client(&topic).await.unwrap();
400 append_records(&partition_client, 10).await.unwrap();
401
402 creator.prepare_topic(&topic).await.unwrap();
403
404 let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap();
405 assert_eq!(end_offset, 10);
406 let start_offset = partition_client
407 .get_offset(OffsetAt::Earliest)
408 .await
409 .unwrap();
410 assert_eq!(start_offset, 0);
411 }
412}