common_meta/wal_options_allocator/
topic_creator.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use common_telemetry::{debug, error, info};
16use common_wal::config::kafka::common::{
17    DEFAULT_BACKOFF_CONFIG, KafkaConnectionConfig, KafkaTopicConfig,
18};
19use rskafka::client::error::Error as RsKafkaError;
20use rskafka::client::error::ProtocolError::TopicAlreadyExists;
21use rskafka::client::partition::{Compression, OffsetAt, PartitionClient, UnknownTopicHandling};
22use rskafka::client::{Client, ClientBuilder};
23use rskafka::record::Record;
24use snafu::ResultExt;
25
26use crate::error::{
27    BuildKafkaClientSnafu, BuildKafkaCtrlClientSnafu, CreateKafkaWalTopicSnafu,
28    KafkaGetOffsetSnafu, KafkaPartitionClientSnafu, ProduceRecordSnafu, Result, TlsConfigSnafu,
29};
30
31// Each topic only has one partition for now.
32// The `DEFAULT_PARTITION` refers to the index of the partition.
33const DEFAULT_PARTITION: i32 = 0;
34
35/// Creates topics in kafka.
36pub struct KafkaTopicCreator {
37    client: Client,
38    /// The number of partitions per topic.
39    num_partitions: i32,
40    /// The replication factor of each topic.
41    replication_factor: i16,
42    /// The timeout of topic creation in milliseconds.
43    create_topic_timeout: i32,
44}
45
46impl KafkaTopicCreator {
47    pub fn client(&self) -> &Client {
48        &self.client
49    }
50
51    async fn create_topic(&self, topic: &String, client: &Client) -> Result<()> {
52        let controller = client
53            .controller_client()
54            .context(BuildKafkaCtrlClientSnafu)?;
55        match controller
56            .create_topic(
57                topic,
58                self.num_partitions,
59                self.replication_factor,
60                self.create_topic_timeout,
61            )
62            .await
63        {
64            Ok(_) => {
65                info!("Successfully created topic {}", topic);
66                Ok(())
67            }
68            Err(e) => {
69                if Self::is_topic_already_exist_err(&e) {
70                    info!("The topic {} already exists", topic);
71                    Ok(())
72                } else {
73                    error!(e; "Failed to create a topic {}", topic);
74                    Err(e).context(CreateKafkaWalTopicSnafu)
75                }
76            }
77        }
78    }
79
80    async fn prepare_topic(&self, topic: &String) -> Result<()> {
81        let partition_client = self.partition_client(topic).await?;
82        self.append_noop_record(topic, &partition_client).await?;
83        Ok(())
84    }
85
86    /// Creates a [PartitionClient] for the given topic.
87    async fn partition_client(&self, topic: &str) -> Result<PartitionClient> {
88        self.client
89            .partition_client(topic, DEFAULT_PARTITION, UnknownTopicHandling::Retry)
90            .await
91            .context(KafkaPartitionClientSnafu {
92                topic,
93                partition: DEFAULT_PARTITION,
94            })
95    }
96
97    /// Appends a noop record to the topic.
98    /// It only appends a noop record if the topic is empty.
99    async fn append_noop_record(
100        &self,
101        topic: &String,
102        partition_client: &PartitionClient,
103    ) -> Result<()> {
104        let end_offset = partition_client
105            .get_offset(OffsetAt::Latest)
106            .await
107            .with_context(|_| KafkaGetOffsetSnafu {
108                topic: topic.clone(),
109                partition: DEFAULT_PARTITION,
110            })?;
111        if end_offset > 0 {
112            return Ok(());
113        }
114
115        partition_client
116            .produce(
117                vec![Record {
118                    key: None,
119                    value: None,
120                    timestamp: chrono::Utc::now(),
121                    headers: Default::default(),
122                }],
123                Compression::Lz4,
124            )
125            .await
126            .context(ProduceRecordSnafu { topic })?;
127        debug!("Appended a noop record to topic {}", topic);
128
129        Ok(())
130    }
131
132    /// Creates topics in Kafka.
133    pub async fn create_topics(&self, topics: &[String]) -> Result<()> {
134        let tasks = topics
135            .iter()
136            .map(|topic| async { self.create_topic(topic, &self.client).await })
137            .collect::<Vec<_>>();
138        futures::future::try_join_all(tasks).await.map(|_| ())
139    }
140
141    /// Prepares topics in Kafka.
142    ///
143    /// It appends a noop record to each topic if the topic is empty.
144    pub async fn prepare_topics(&self, topics: &[String]) -> Result<()> {
145        // Try to create missing topics.
146        let tasks = topics
147            .iter()
148            .map(|topic| async { self.prepare_topic(topic).await })
149            .collect::<Vec<_>>();
150        futures::future::try_join_all(tasks).await.map(|_| ())
151    }
152
153    fn is_topic_already_exist_err(e: &RsKafkaError) -> bool {
154        matches!(
155            e,
156            &RsKafkaError::ServerError {
157                protocol_error: TopicAlreadyExists,
158                ..
159            }
160        )
161    }
162}
163
164#[cfg(test)]
165impl KafkaTopicCreator {
166    pub async fn delete_topics(&self, topics: &[String]) -> Result<()> {
167        let tasks = topics
168            .iter()
169            .map(|topic| async { self.delete_topic(topic, &self.client).await })
170            .collect::<Vec<_>>();
171        futures::future::try_join_all(tasks).await.map(|_| ())
172    }
173
174    async fn delete_topic(&self, topic: &String, client: &Client) -> Result<()> {
175        let controller = client
176            .controller_client()
177            .context(BuildKafkaCtrlClientSnafu)?;
178        match controller.delete_topic(topic, 10).await {
179            Ok(_) => {
180                info!("Successfully deleted topic {}", topic);
181                Ok(())
182            }
183            Err(e) => {
184                if Self::is_unknown_topic_err(&e) {
185                    info!("The topic {} does not exist", topic);
186                    Ok(())
187                } else {
188                    panic!("Failed to delete a topic {}, error: {}", topic, e);
189                }
190            }
191        }
192    }
193
194    fn is_unknown_topic_err(e: &RsKafkaError) -> bool {
195        matches!(
196            e,
197            &RsKafkaError::ServerError {
198                protocol_error: rskafka::client::error::ProtocolError::UnknownTopicOrPartition,
199                ..
200            }
201        )
202    }
203
204    pub async fn get_partition_client(&self, topic: &str) -> PartitionClient {
205        self.partition_client(topic).await.unwrap()
206    }
207}
208/// Builds a kafka [Client](rskafka::client::Client).
209pub async fn build_kafka_client(connection: &KafkaConnectionConfig) -> Result<Client> {
210    // Builds an kafka controller client for creating topics.
211    let mut builder = ClientBuilder::new(connection.broker_endpoints.clone())
212        .backoff_config(DEFAULT_BACKOFF_CONFIG);
213    if let Some(sasl) = &connection.sasl {
214        builder = builder.sasl_config(sasl.config.clone().into_sasl_config());
215    };
216    if let Some(tls) = &connection.tls {
217        builder = builder.tls_config(tls.to_tls_config().await.context(TlsConfigSnafu)?)
218    };
219    builder
220        .build()
221        .await
222        .with_context(|_| BuildKafkaClientSnafu {
223            broker_endpoints: connection.broker_endpoints.clone(),
224        })
225}
226
227/// Builds a [KafkaTopicCreator].
228pub async fn build_kafka_topic_creator(
229    connection: &KafkaConnectionConfig,
230    kafka_topic: &KafkaTopicConfig,
231) -> Result<KafkaTopicCreator> {
232    let client = build_kafka_client(connection).await?;
233    Ok(KafkaTopicCreator {
234        client,
235        num_partitions: kafka_topic.num_partitions,
236        replication_factor: kafka_topic.replication_factor,
237        create_topic_timeout: kafka_topic.create_topic_timeout.as_millis() as i32,
238    })
239}
240
241#[cfg(test)]
242mod tests {
243    use common_wal::config::kafka::common::{KafkaConnectionConfig, KafkaTopicConfig};
244    use common_wal::maybe_skip_kafka_integration_test;
245    use common_wal::test_util::get_kafka_endpoints;
246
247    use super::*;
248
249    async fn test_topic_creator(broker_endpoints: Vec<String>) -> KafkaTopicCreator {
250        let connection = KafkaConnectionConfig {
251            broker_endpoints,
252            ..Default::default()
253        };
254        let kafka_topic = KafkaTopicConfig::default();
255
256        build_kafka_topic_creator(&connection, &kafka_topic)
257            .await
258            .unwrap()
259    }
260
261    async fn append_records(partition_client: &PartitionClient, num_records: usize) -> Result<()> {
262        for i in 0..num_records {
263            partition_client
264                .produce(
265                    vec![Record {
266                        key: Some(b"test".to_vec()),
267                        value: Some(format!("test {}", i).as_bytes().to_vec()),
268                        timestamp: chrono::Utc::now(),
269                        headers: Default::default(),
270                    }],
271                    Compression::Lz4,
272                )
273                .await
274                .unwrap();
275        }
276        Ok(())
277    }
278
279    #[tokio::test]
280    async fn test_append_noop_record_to_empty_topic() {
281        common_telemetry::init_default_ut_logging();
282        maybe_skip_kafka_integration_test!();
283        let prefix = "append_noop_record_to_empty_topic";
284        let creator = test_topic_creator(get_kafka_endpoints()).await;
285
286        let topic = format!("{}{}", prefix, "0");
287        let topics = std::slice::from_ref(&topic);
288
289        // Clean up the topics before test
290        creator.delete_topics(topics).await.unwrap();
291        creator.create_topics(topics).await.unwrap();
292
293        let partition_client = creator.partition_client(&topic).await.unwrap();
294        let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap();
295        assert_eq!(end_offset, 0);
296
297        // The topic is not empty, so no noop record is appended.
298        creator
299            .append_noop_record(&topic, &partition_client)
300            .await
301            .unwrap();
302        let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap();
303        assert_eq!(end_offset, 1);
304    }
305
306    #[tokio::test]
307    async fn test_append_noop_record_to_non_empty_topic() {
308        common_telemetry::init_default_ut_logging();
309        maybe_skip_kafka_integration_test!();
310        let prefix = "append_noop_record_to_non_empty_topic";
311        let creator = test_topic_creator(get_kafka_endpoints()).await;
312
313        let topic = format!("{}{}", prefix, "0");
314        let topics = std::slice::from_ref(&topic);
315
316        // Clean up the topics before test
317        creator.delete_topics(topics).await.unwrap();
318
319        creator.create_topics(topics).await.unwrap();
320        let partition_client = creator.partition_client(&topic).await.unwrap();
321        append_records(&partition_client, 2).await.unwrap();
322
323        let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap();
324        assert_eq!(end_offset, 2);
325
326        // The topic is not empty, so no noop record is appended.
327        creator
328            .append_noop_record(&topic, &partition_client)
329            .await
330            .unwrap();
331        let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap();
332        assert_eq!(end_offset, 2);
333    }
334
335    #[tokio::test]
336    async fn test_create_topic() {
337        common_telemetry::init_default_ut_logging();
338        maybe_skip_kafka_integration_test!();
339        let prefix = "create_topic";
340        let creator = test_topic_creator(get_kafka_endpoints()).await;
341
342        let topic = format!("{}{}", prefix, "0");
343        let topics = std::slice::from_ref(&topic);
344
345        // Clean up the topics before test
346        creator.delete_topics(topics).await.unwrap();
347
348        creator.create_topics(topics).await.unwrap();
349        // Should be ok
350        creator.create_topics(topics).await.unwrap();
351
352        let partition_client = creator.partition_client(&topic).await.unwrap();
353        let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap();
354        assert_eq!(end_offset, 0);
355    }
356
357    #[tokio::test]
358    async fn test_prepare_topic() {
359        common_telemetry::init_default_ut_logging();
360        maybe_skip_kafka_integration_test!();
361        let prefix = "prepare_topic";
362        let creator = test_topic_creator(get_kafka_endpoints()).await;
363
364        let topic = format!("{}{}", prefix, "0");
365        let topics = std::slice::from_ref(&topic);
366
367        // Clean up the topics before test
368        creator.delete_topics(topics).await.unwrap();
369
370        creator.create_topics(topics).await.unwrap();
371        creator.prepare_topic(&topic).await.unwrap();
372
373        let partition_client = creator.partition_client(&topic).await.unwrap();
374        let start_offset = partition_client
375            .get_offset(OffsetAt::Earliest)
376            .await
377            .unwrap();
378        assert_eq!(start_offset, 0);
379
380        let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap();
381        assert_eq!(end_offset, 1);
382    }
383
384    #[tokio::test]
385    async fn test_prepare_topic_with_stale_records_without_pruning() {
386        common_telemetry::init_default_ut_logging();
387        maybe_skip_kafka_integration_test!();
388
389        let prefix = "prepare_topic_with_stale_records_without_pruning";
390        let creator = test_topic_creator(get_kafka_endpoints()).await;
391
392        let topic = format!("{}{}", prefix, "0");
393        let topics = std::slice::from_ref(&topic);
394
395        // Clean up the topics before test
396        creator.delete_topics(topics).await.unwrap();
397
398        creator.create_topics(topics).await.unwrap();
399        let partition_client = creator.partition_client(&topic).await.unwrap();
400        append_records(&partition_client, 10).await.unwrap();
401
402        creator.prepare_topic(&topic).await.unwrap();
403
404        let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap();
405        assert_eq!(end_offset, 10);
406        let start_offset = partition_client
407            .get_offset(OffsetAt::Earliest)
408            .await
409            .unwrap();
410        assert_eq!(start_offset, 0);
411    }
412}