common_meta/wal_options_allocator/
topic_creator.rs1use common_telemetry::{debug, error, info};
16use common_wal::config::kafka::common::{
17 DEFAULT_BACKOFF_CONFIG, DEFAULT_CONNECT_TIMEOUT, KafkaConnectionConfig, KafkaTopicConfig,
18};
19use rskafka::client::error::Error as RsKafkaError;
20use rskafka::client::error::ProtocolError::TopicAlreadyExists;
21use rskafka::client::partition::{Compression, OffsetAt, PartitionClient, UnknownTopicHandling};
22use rskafka::client::{Client, ClientBuilder};
23use rskafka::record::Record;
24use snafu::ResultExt;
25
26use crate::error::{
27 BuildKafkaClientSnafu, BuildKafkaCtrlClientSnafu, CreateKafkaWalTopicSnafu,
28 KafkaGetOffsetSnafu, KafkaPartitionClientSnafu, ProduceRecordSnafu, Result, TlsConfigSnafu,
29};
30
31const DEFAULT_PARTITION: i32 = 0;
34
35pub struct KafkaTopicCreator {
37 client: Client,
38 num_partitions: i32,
40 replication_factor: i16,
42 create_topic_timeout: i32,
44}
45
46impl KafkaTopicCreator {
47 pub fn client(&self) -> &Client {
48 &self.client
49 }
50
51 async fn create_topic(&self, topic: &String, client: &Client) -> Result<()> {
52 let controller = client
53 .controller_client()
54 .context(BuildKafkaCtrlClientSnafu)?;
55 match controller
56 .create_topic(
57 topic,
58 self.num_partitions,
59 self.replication_factor,
60 self.create_topic_timeout,
61 )
62 .await
63 {
64 Ok(_) => {
65 info!("Successfully created topic {}", topic);
66 Ok(())
67 }
68 Err(e) => {
69 if Self::is_topic_already_exist_err(&e) {
70 info!("The topic {} already exists", topic);
71 Ok(())
72 } else {
73 error!(e; "Failed to create a topic {}", topic);
74 Err(e).context(CreateKafkaWalTopicSnafu)
75 }
76 }
77 }
78 }
79
80 async fn prepare_topic(&self, topic: &String) -> Result<()> {
81 let partition_client = self.partition_client(topic).await?;
82 self.append_noop_record(topic, &partition_client).await?;
83 Ok(())
84 }
85
86 async fn partition_client(&self, topic: &str) -> Result<PartitionClient> {
88 self.client
89 .partition_client(topic, DEFAULT_PARTITION, UnknownTopicHandling::Retry)
90 .await
91 .context(KafkaPartitionClientSnafu {
92 topic,
93 partition: DEFAULT_PARTITION,
94 })
95 }
96
97 async fn append_noop_record(
100 &self,
101 topic: &String,
102 partition_client: &PartitionClient,
103 ) -> Result<()> {
104 let end_offset = partition_client
105 .get_offset(OffsetAt::Latest)
106 .await
107 .with_context(|_| KafkaGetOffsetSnafu {
108 topic: topic.clone(),
109 partition: DEFAULT_PARTITION,
110 })?;
111 if end_offset > 0 {
112 return Ok(());
113 }
114
115 partition_client
116 .produce(
117 vec![Record {
118 key: None,
119 value: None,
120 timestamp: chrono::Utc::now(),
121 headers: Default::default(),
122 }],
123 Compression::Lz4,
124 )
125 .await
126 .context(ProduceRecordSnafu { topic })?;
127 debug!("Appended a noop record to topic {}", topic);
128
129 Ok(())
130 }
131
132 pub async fn create_topics(&self, topics: &[String]) -> Result<()> {
134 let tasks = topics
135 .iter()
136 .map(|topic| async { self.create_topic(topic, &self.client).await })
137 .collect::<Vec<_>>();
138 futures::future::try_join_all(tasks).await.map(|_| ())
139 }
140
141 pub async fn prepare_topics(&self, topics: &[String]) -> Result<()> {
145 let tasks = topics
147 .iter()
148 .map(|topic| async { self.prepare_topic(topic).await })
149 .collect::<Vec<_>>();
150 futures::future::try_join_all(tasks).await.map(|_| ())
151 }
152
153 fn is_topic_already_exist_err(e: &RsKafkaError) -> bool {
154 matches!(
155 e,
156 &RsKafkaError::ServerError {
157 protocol_error: TopicAlreadyExists,
158 ..
159 }
160 )
161 }
162}
163
164#[cfg(test)]
165impl KafkaTopicCreator {
166 pub async fn delete_topics(&self, topics: &[String]) -> Result<()> {
167 let tasks = topics
168 .iter()
169 .map(|topic| async { self.delete_topic(topic, &self.client).await })
170 .collect::<Vec<_>>();
171 futures::future::try_join_all(tasks).await.map(|_| ())
172 }
173
174 async fn delete_topic(&self, topic: &String, client: &Client) -> Result<()> {
175 let controller = client
176 .controller_client()
177 .context(BuildKafkaCtrlClientSnafu)?;
178 match controller.delete_topic(topic, 10).await {
179 Ok(_) => {
180 info!("Successfully deleted topic {}", topic);
181 Ok(())
182 }
183 Err(e) => {
184 if Self::is_unknown_topic_err(&e) {
185 info!("The topic {} does not exist", topic);
186 Ok(())
187 } else {
188 panic!("Failed to delete a topic {}, error: {}", topic, e);
189 }
190 }
191 }
192 }
193
194 fn is_unknown_topic_err(e: &RsKafkaError) -> bool {
195 matches!(
196 e,
197 &RsKafkaError::ServerError {
198 protocol_error: rskafka::client::error::ProtocolError::UnknownTopicOrPartition,
199 ..
200 }
201 )
202 }
203
204 pub async fn get_partition_client(&self, topic: &str) -> PartitionClient {
205 self.partition_client(topic).await.unwrap()
206 }
207}
208
209pub async fn build_kafka_client(connection: &KafkaConnectionConfig) -> Result<Client> {
211 let mut builder = ClientBuilder::new(connection.broker_endpoints.clone())
213 .backoff_config(DEFAULT_BACKOFF_CONFIG)
214 .connect_timeout(Some(DEFAULT_CONNECT_TIMEOUT));
215 if let Some(sasl) = &connection.sasl {
216 builder = builder.sasl_config(sasl.config.clone().into_sasl_config());
217 };
218 if let Some(tls) = &connection.tls {
219 builder = builder.tls_config(tls.to_tls_config().await.context(TlsConfigSnafu)?)
220 };
221 builder
222 .build()
223 .await
224 .with_context(|_| BuildKafkaClientSnafu {
225 broker_endpoints: connection.broker_endpoints.clone(),
226 })
227}
228
229pub async fn build_kafka_topic_creator(
231 connection: &KafkaConnectionConfig,
232 kafka_topic: &KafkaTopicConfig,
233) -> Result<KafkaTopicCreator> {
234 let client = build_kafka_client(connection).await?;
235 Ok(KafkaTopicCreator {
236 client,
237 num_partitions: kafka_topic.num_partitions,
238 replication_factor: kafka_topic.replication_factor,
239 create_topic_timeout: kafka_topic.create_topic_timeout.as_millis() as i32,
240 })
241}
242
243#[cfg(test)]
244mod tests {
245 use common_wal::config::kafka::common::{KafkaConnectionConfig, KafkaTopicConfig};
246 use common_wal::maybe_skip_kafka_integration_test;
247 use common_wal::test_util::get_kafka_endpoints;
248
249 use super::*;
250
251 async fn test_topic_creator(broker_endpoints: Vec<String>) -> KafkaTopicCreator {
252 let connection = KafkaConnectionConfig {
253 broker_endpoints,
254 ..Default::default()
255 };
256 let kafka_topic = KafkaTopicConfig::default();
257
258 build_kafka_topic_creator(&connection, &kafka_topic)
259 .await
260 .unwrap()
261 }
262
263 async fn append_records(partition_client: &PartitionClient, num_records: usize) -> Result<()> {
264 for i in 0..num_records {
265 partition_client
266 .produce(
267 vec![Record {
268 key: Some(b"test".to_vec()),
269 value: Some(format!("test {}", i).as_bytes().to_vec()),
270 timestamp: chrono::Utc::now(),
271 headers: Default::default(),
272 }],
273 Compression::Lz4,
274 )
275 .await
276 .unwrap();
277 }
278 Ok(())
279 }
280
281 #[tokio::test]
282 async fn test_append_noop_record_to_empty_topic() {
283 common_telemetry::init_default_ut_logging();
284 maybe_skip_kafka_integration_test!();
285 let prefix = "append_noop_record_to_empty_topic";
286 let creator = test_topic_creator(get_kafka_endpoints()).await;
287
288 let topic = format!("{}{}", prefix, "0");
289 let topics = std::slice::from_ref(&topic);
290
291 creator.delete_topics(topics).await.unwrap();
293 creator.create_topics(topics).await.unwrap();
294
295 let partition_client = creator.partition_client(&topic).await.unwrap();
296 let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap();
297 assert_eq!(end_offset, 0);
298
299 creator
301 .append_noop_record(&topic, &partition_client)
302 .await
303 .unwrap();
304 let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap();
305 assert_eq!(end_offset, 1);
306 }
307
308 #[tokio::test]
309 async fn test_append_noop_record_to_non_empty_topic() {
310 common_telemetry::init_default_ut_logging();
311 maybe_skip_kafka_integration_test!();
312 let prefix = "append_noop_record_to_non_empty_topic";
313 let creator = test_topic_creator(get_kafka_endpoints()).await;
314
315 let topic = format!("{}{}", prefix, "0");
316 let topics = std::slice::from_ref(&topic);
317
318 creator.delete_topics(topics).await.unwrap();
320
321 creator.create_topics(topics).await.unwrap();
322 let partition_client = creator.partition_client(&topic).await.unwrap();
323 append_records(&partition_client, 2).await.unwrap();
324
325 let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap();
326 assert_eq!(end_offset, 2);
327
328 creator
330 .append_noop_record(&topic, &partition_client)
331 .await
332 .unwrap();
333 let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap();
334 assert_eq!(end_offset, 2);
335 }
336
337 #[tokio::test]
338 async fn test_create_topic() {
339 common_telemetry::init_default_ut_logging();
340 maybe_skip_kafka_integration_test!();
341 let prefix = "create_topic";
342 let creator = test_topic_creator(get_kafka_endpoints()).await;
343
344 let topic = format!("{}{}", prefix, "0");
345 let topics = std::slice::from_ref(&topic);
346
347 creator.delete_topics(topics).await.unwrap();
349
350 creator.create_topics(topics).await.unwrap();
351 creator.create_topics(topics).await.unwrap();
353
354 let partition_client = creator.partition_client(&topic).await.unwrap();
355 let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap();
356 assert_eq!(end_offset, 0);
357 }
358
359 #[tokio::test]
360 async fn test_prepare_topic() {
361 common_telemetry::init_default_ut_logging();
362 maybe_skip_kafka_integration_test!();
363 let prefix = "prepare_topic";
364 let creator = test_topic_creator(get_kafka_endpoints()).await;
365
366 let topic = format!("{}{}", prefix, "0");
367 let topics = std::slice::from_ref(&topic);
368
369 creator.delete_topics(topics).await.unwrap();
371
372 creator.create_topics(topics).await.unwrap();
373 creator.prepare_topic(&topic).await.unwrap();
374
375 let partition_client = creator.partition_client(&topic).await.unwrap();
376 let start_offset = partition_client
377 .get_offset(OffsetAt::Earliest)
378 .await
379 .unwrap();
380 assert_eq!(start_offset, 0);
381
382 let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap();
383 assert_eq!(end_offset, 1);
384 }
385
386 #[tokio::test]
387 async fn test_prepare_topic_with_stale_records_without_pruning() {
388 common_telemetry::init_default_ut_logging();
389 maybe_skip_kafka_integration_test!();
390
391 let prefix = "prepare_topic_with_stale_records_without_pruning";
392 let creator = test_topic_creator(get_kafka_endpoints()).await;
393
394 let topic = format!("{}{}", prefix, "0");
395 let topics = std::slice::from_ref(&topic);
396
397 creator.delete_topics(topics).await.unwrap();
399
400 creator.create_topics(topics).await.unwrap();
401 let partition_client = creator.partition_client(&topic).await.unwrap();
402 append_records(&partition_client, 10).await.unwrap();
403
404 creator.prepare_topic(&topic).await.unwrap();
405
406 let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap();
407 assert_eq!(end_offset, 10);
408 let start_offset = partition_client
409 .get_offset(OffsetAt::Earliest)
410 .await
411 .unwrap();
412 assert_eq!(start_offset, 0);
413 }
414}