common_meta/wal_options_allocator/
topic_creator.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use common_telemetry::{debug, error, info};
16use common_wal::config::kafka::common::{
17    DEFAULT_BACKOFF_CONFIG, DEFAULT_CONNECT_TIMEOUT, KafkaConnectionConfig, KafkaTopicConfig,
18};
19use rskafka::client::error::Error as RsKafkaError;
20use rskafka::client::error::ProtocolError::TopicAlreadyExists;
21use rskafka::client::partition::{Compression, OffsetAt, PartitionClient, UnknownTopicHandling};
22use rskafka::client::{Client, ClientBuilder};
23use rskafka::record::Record;
24use snafu::ResultExt;
25
26use crate::error::{
27    BuildKafkaClientSnafu, BuildKafkaCtrlClientSnafu, CreateKafkaWalTopicSnafu,
28    KafkaGetOffsetSnafu, KafkaPartitionClientSnafu, ProduceRecordSnafu, Result, TlsConfigSnafu,
29};
30
31// Each topic only has one partition for now.
32// The `DEFAULT_PARTITION` refers to the index of the partition.
33const DEFAULT_PARTITION: i32 = 0;
34
35/// Creates topics in kafka.
36pub struct KafkaTopicCreator {
37    client: Client,
38    /// The number of partitions per topic.
39    num_partitions: i32,
40    /// The replication factor of each topic.
41    replication_factor: i16,
42    /// The timeout of topic creation in milliseconds.
43    create_topic_timeout: i32,
44}
45
46impl KafkaTopicCreator {
47    pub fn client(&self) -> &Client {
48        &self.client
49    }
50
51    async fn create_topic(&self, topic: &String, client: &Client) -> Result<()> {
52        let controller = client
53            .controller_client()
54            .context(BuildKafkaCtrlClientSnafu)?;
55        match controller
56            .create_topic(
57                topic,
58                self.num_partitions,
59                self.replication_factor,
60                self.create_topic_timeout,
61            )
62            .await
63        {
64            Ok(_) => {
65                info!("Successfully created topic {}", topic);
66                Ok(())
67            }
68            Err(e) => {
69                if Self::is_topic_already_exist_err(&e) {
70                    info!("The topic {} already exists", topic);
71                    Ok(())
72                } else {
73                    error!(e; "Failed to create a topic {}", topic);
74                    Err(e).context(CreateKafkaWalTopicSnafu)
75                }
76            }
77        }
78    }
79
80    async fn prepare_topic(&self, topic: &String) -> Result<()> {
81        let partition_client = self.partition_client(topic).await?;
82        self.append_noop_record(topic, &partition_client).await?;
83        Ok(())
84    }
85
86    /// Creates a [PartitionClient] for the given topic.
87    async fn partition_client(&self, topic: &str) -> Result<PartitionClient> {
88        self.client
89            .partition_client(topic, DEFAULT_PARTITION, UnknownTopicHandling::Retry)
90            .await
91            .context(KafkaPartitionClientSnafu {
92                topic,
93                partition: DEFAULT_PARTITION,
94            })
95    }
96
97    /// Appends a noop record to the topic.
98    /// It only appends a noop record if the topic is empty.
99    async fn append_noop_record(
100        &self,
101        topic: &String,
102        partition_client: &PartitionClient,
103    ) -> Result<()> {
104        let end_offset = partition_client
105            .get_offset(OffsetAt::Latest)
106            .await
107            .with_context(|_| KafkaGetOffsetSnafu {
108                topic: topic.clone(),
109                partition: DEFAULT_PARTITION,
110            })?;
111        if end_offset > 0 {
112            return Ok(());
113        }
114
115        partition_client
116            .produce(
117                vec![Record {
118                    key: None,
119                    value: None,
120                    timestamp: chrono::Utc::now(),
121                    headers: Default::default(),
122                }],
123                Compression::Lz4,
124            )
125            .await
126            .context(ProduceRecordSnafu { topic })?;
127        debug!("Appended a noop record to topic {}", topic);
128
129        Ok(())
130    }
131
132    /// Creates topics in Kafka.
133    pub async fn create_topics(&self, topics: &[String]) -> Result<()> {
134        let tasks = topics
135            .iter()
136            .map(|topic| async { self.create_topic(topic, &self.client).await })
137            .collect::<Vec<_>>();
138        futures::future::try_join_all(tasks).await.map(|_| ())
139    }
140
141    /// Prepares topics in Kafka.
142    ///
143    /// It appends a noop record to each topic if the topic is empty.
144    pub async fn prepare_topics(&self, topics: &[String]) -> Result<()> {
145        // Try to create missing topics.
146        let tasks = topics
147            .iter()
148            .map(|topic| async { self.prepare_topic(topic).await })
149            .collect::<Vec<_>>();
150        futures::future::try_join_all(tasks).await.map(|_| ())
151    }
152
153    fn is_topic_already_exist_err(e: &RsKafkaError) -> bool {
154        matches!(
155            e,
156            &RsKafkaError::ServerError {
157                protocol_error: TopicAlreadyExists,
158                ..
159            }
160        )
161    }
162}
163
164#[cfg(test)]
165impl KafkaTopicCreator {
166    pub async fn delete_topics(&self, topics: &[String]) -> Result<()> {
167        let tasks = topics
168            .iter()
169            .map(|topic| async { self.delete_topic(topic, &self.client).await })
170            .collect::<Vec<_>>();
171        futures::future::try_join_all(tasks).await.map(|_| ())
172    }
173
174    async fn delete_topic(&self, topic: &String, client: &Client) -> Result<()> {
175        let controller = client
176            .controller_client()
177            .context(BuildKafkaCtrlClientSnafu)?;
178        match controller.delete_topic(topic, 10).await {
179            Ok(_) => {
180                info!("Successfully deleted topic {}", topic);
181                Ok(())
182            }
183            Err(e) => {
184                if Self::is_unknown_topic_err(&e) {
185                    info!("The topic {} does not exist", topic);
186                    Ok(())
187                } else {
188                    panic!("Failed to delete a topic {}, error: {}", topic, e);
189                }
190            }
191        }
192    }
193
194    fn is_unknown_topic_err(e: &RsKafkaError) -> bool {
195        matches!(
196            e,
197            &RsKafkaError::ServerError {
198                protocol_error: rskafka::client::error::ProtocolError::UnknownTopicOrPartition,
199                ..
200            }
201        )
202    }
203
204    pub async fn get_partition_client(&self, topic: &str) -> PartitionClient {
205        self.partition_client(topic).await.unwrap()
206    }
207}
208
209/// Builds a kafka [Client](rskafka::client::Client).
210pub async fn build_kafka_client(connection: &KafkaConnectionConfig) -> Result<Client> {
211    // Builds an kafka controller client for creating topics.
212    let mut builder = ClientBuilder::new(connection.broker_endpoints.clone())
213        .backoff_config(DEFAULT_BACKOFF_CONFIG)
214        .connect_timeout(Some(DEFAULT_CONNECT_TIMEOUT));
215    if let Some(sasl) = &connection.sasl {
216        builder = builder.sasl_config(sasl.config.clone().into_sasl_config());
217    };
218    if let Some(tls) = &connection.tls {
219        builder = builder.tls_config(tls.to_tls_config().await.context(TlsConfigSnafu)?)
220    };
221    builder
222        .build()
223        .await
224        .with_context(|_| BuildKafkaClientSnafu {
225            broker_endpoints: connection.broker_endpoints.clone(),
226        })
227}
228
229/// Builds a [KafkaTopicCreator].
230pub async fn build_kafka_topic_creator(
231    connection: &KafkaConnectionConfig,
232    kafka_topic: &KafkaTopicConfig,
233) -> Result<KafkaTopicCreator> {
234    let client = build_kafka_client(connection).await?;
235    Ok(KafkaTopicCreator {
236        client,
237        num_partitions: kafka_topic.num_partitions,
238        replication_factor: kafka_topic.replication_factor,
239        create_topic_timeout: kafka_topic.create_topic_timeout.as_millis() as i32,
240    })
241}
242
243#[cfg(test)]
244mod tests {
245    use common_wal::config::kafka::common::{KafkaConnectionConfig, KafkaTopicConfig};
246    use common_wal::maybe_skip_kafka_integration_test;
247    use common_wal::test_util::get_kafka_endpoints;
248
249    use super::*;
250
251    async fn test_topic_creator(broker_endpoints: Vec<String>) -> KafkaTopicCreator {
252        let connection = KafkaConnectionConfig {
253            broker_endpoints,
254            ..Default::default()
255        };
256        let kafka_topic = KafkaTopicConfig::default();
257
258        build_kafka_topic_creator(&connection, &kafka_topic)
259            .await
260            .unwrap()
261    }
262
263    async fn append_records(partition_client: &PartitionClient, num_records: usize) -> Result<()> {
264        for i in 0..num_records {
265            partition_client
266                .produce(
267                    vec![Record {
268                        key: Some(b"test".to_vec()),
269                        value: Some(format!("test {}", i).as_bytes().to_vec()),
270                        timestamp: chrono::Utc::now(),
271                        headers: Default::default(),
272                    }],
273                    Compression::Lz4,
274                )
275                .await
276                .unwrap();
277        }
278        Ok(())
279    }
280
281    #[tokio::test]
282    async fn test_append_noop_record_to_empty_topic() {
283        common_telemetry::init_default_ut_logging();
284        maybe_skip_kafka_integration_test!();
285        let prefix = "append_noop_record_to_empty_topic";
286        let creator = test_topic_creator(get_kafka_endpoints()).await;
287
288        let topic = format!("{}{}", prefix, "0");
289        let topics = std::slice::from_ref(&topic);
290
291        // Clean up the topics before test
292        creator.delete_topics(topics).await.unwrap();
293        creator.create_topics(topics).await.unwrap();
294
295        let partition_client = creator.partition_client(&topic).await.unwrap();
296        let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap();
297        assert_eq!(end_offset, 0);
298
299        // The topic is not empty, so no noop record is appended.
300        creator
301            .append_noop_record(&topic, &partition_client)
302            .await
303            .unwrap();
304        let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap();
305        assert_eq!(end_offset, 1);
306    }
307
308    #[tokio::test]
309    async fn test_append_noop_record_to_non_empty_topic() {
310        common_telemetry::init_default_ut_logging();
311        maybe_skip_kafka_integration_test!();
312        let prefix = "append_noop_record_to_non_empty_topic";
313        let creator = test_topic_creator(get_kafka_endpoints()).await;
314
315        let topic = format!("{}{}", prefix, "0");
316        let topics = std::slice::from_ref(&topic);
317
318        // Clean up the topics before test
319        creator.delete_topics(topics).await.unwrap();
320
321        creator.create_topics(topics).await.unwrap();
322        let partition_client = creator.partition_client(&topic).await.unwrap();
323        append_records(&partition_client, 2).await.unwrap();
324
325        let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap();
326        assert_eq!(end_offset, 2);
327
328        // The topic is not empty, so no noop record is appended.
329        creator
330            .append_noop_record(&topic, &partition_client)
331            .await
332            .unwrap();
333        let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap();
334        assert_eq!(end_offset, 2);
335    }
336
337    #[tokio::test]
338    async fn test_create_topic() {
339        common_telemetry::init_default_ut_logging();
340        maybe_skip_kafka_integration_test!();
341        let prefix = "create_topic";
342        let creator = test_topic_creator(get_kafka_endpoints()).await;
343
344        let topic = format!("{}{}", prefix, "0");
345        let topics = std::slice::from_ref(&topic);
346
347        // Clean up the topics before test
348        creator.delete_topics(topics).await.unwrap();
349
350        creator.create_topics(topics).await.unwrap();
351        // Should be ok
352        creator.create_topics(topics).await.unwrap();
353
354        let partition_client = creator.partition_client(&topic).await.unwrap();
355        let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap();
356        assert_eq!(end_offset, 0);
357    }
358
359    #[tokio::test]
360    async fn test_prepare_topic() {
361        common_telemetry::init_default_ut_logging();
362        maybe_skip_kafka_integration_test!();
363        let prefix = "prepare_topic";
364        let creator = test_topic_creator(get_kafka_endpoints()).await;
365
366        let topic = format!("{}{}", prefix, "0");
367        let topics = std::slice::from_ref(&topic);
368
369        // Clean up the topics before test
370        creator.delete_topics(topics).await.unwrap();
371
372        creator.create_topics(topics).await.unwrap();
373        creator.prepare_topic(&topic).await.unwrap();
374
375        let partition_client = creator.partition_client(&topic).await.unwrap();
376        let start_offset = partition_client
377            .get_offset(OffsetAt::Earliest)
378            .await
379            .unwrap();
380        assert_eq!(start_offset, 0);
381
382        let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap();
383        assert_eq!(end_offset, 1);
384    }
385
386    #[tokio::test]
387    async fn test_prepare_topic_with_stale_records_without_pruning() {
388        common_telemetry::init_default_ut_logging();
389        maybe_skip_kafka_integration_test!();
390
391        let prefix = "prepare_topic_with_stale_records_without_pruning";
392        let creator = test_topic_creator(get_kafka_endpoints()).await;
393
394        let topic = format!("{}{}", prefix, "0");
395        let topics = std::slice::from_ref(&topic);
396
397        // Clean up the topics before test
398        creator.delete_topics(topics).await.unwrap();
399
400        creator.create_topics(topics).await.unwrap();
401        let partition_client = creator.partition_client(&topic).await.unwrap();
402        append_records(&partition_client, 10).await.unwrap();
403
404        creator.prepare_topic(&topic).await.unwrap();
405
406        let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap();
407        assert_eq!(end_offset, 10);
408        let start_offset = partition_client
409            .get_offset(OffsetAt::Earliest)
410            .await
411            .unwrap();
412        assert_eq!(start_offset, 0);
413    }
414}