common_meta/wal_options_allocator/
topic_creator.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use common_telemetry::{debug, error, info};
16use common_wal::config::kafka::common::{
17    DEFAULT_BACKOFF_CONFIG, KafkaConnectionConfig, KafkaTopicConfig,
18};
19use rskafka::client::error::Error as RsKafkaError;
20use rskafka::client::error::ProtocolError::TopicAlreadyExists;
21use rskafka::client::partition::{Compression, OffsetAt, PartitionClient, UnknownTopicHandling};
22use rskafka::client::{Client, ClientBuilder};
23use rskafka::record::Record;
24use snafu::ResultExt;
25
26use crate::error::{
27    BuildKafkaClientSnafu, BuildKafkaCtrlClientSnafu, CreateKafkaWalTopicSnafu,
28    KafkaGetOffsetSnafu, KafkaPartitionClientSnafu, ProduceRecordSnafu, Result, TlsConfigSnafu,
29};
30
31// Each topic only has one partition for now.
32// The `DEFAULT_PARTITION` refers to the index of the partition.
33const DEFAULT_PARTITION: i32 = 0;
34
35/// Creates topics in kafka.
36pub struct KafkaTopicCreator {
37    client: Client,
38    /// The number of partitions per topic.
39    num_partitions: i32,
40    /// The replication factor of each topic.
41    replication_factor: i16,
42    /// The timeout of topic creation in milliseconds.
43    create_topic_timeout: i32,
44}
45
46impl KafkaTopicCreator {
47    pub fn client(&self) -> &Client {
48        &self.client
49    }
50
51    async fn create_topic(&self, topic: &String, client: &Client) -> Result<()> {
52        let controller = client
53            .controller_client()
54            .context(BuildKafkaCtrlClientSnafu)?;
55        match controller
56            .create_topic(
57                topic,
58                self.num_partitions,
59                self.replication_factor,
60                self.create_topic_timeout,
61            )
62            .await
63        {
64            Ok(_) => {
65                info!("Successfully created topic {}", topic);
66                Ok(())
67            }
68            Err(e) => {
69                if Self::is_topic_already_exist_err(&e) {
70                    info!("The topic {} already exists", topic);
71                    Ok(())
72                } else {
73                    error!(e; "Failed to create a topic {}", topic);
74                    Err(e).context(CreateKafkaWalTopicSnafu)
75                }
76            }
77        }
78    }
79
80    async fn prepare_topic(&self, topic: &String) -> Result<()> {
81        let partition_client = self.partition_client(topic).await?;
82        self.append_noop_record(topic, &partition_client).await?;
83        Ok(())
84    }
85
86    /// Creates a [PartitionClient] for the given topic.
87    async fn partition_client(&self, topic: &str) -> Result<PartitionClient> {
88        self.client
89            .partition_client(topic, DEFAULT_PARTITION, UnknownTopicHandling::Retry)
90            .await
91            .context(KafkaPartitionClientSnafu {
92                topic,
93                partition: DEFAULT_PARTITION,
94            })
95    }
96
97    /// Appends a noop record to the topic.
98    /// It only appends a noop record if the topic is empty.
99    async fn append_noop_record(
100        &self,
101        topic: &String,
102        partition_client: &PartitionClient,
103    ) -> Result<()> {
104        let end_offset = partition_client
105            .get_offset(OffsetAt::Latest)
106            .await
107            .context(KafkaGetOffsetSnafu {
108                topic: topic.to_string(),
109                partition: DEFAULT_PARTITION,
110            })?;
111        if end_offset > 0 {
112            return Ok(());
113        }
114
115        partition_client
116            .produce(
117                vec![Record {
118                    key: None,
119                    value: None,
120                    timestamp: chrono::Utc::now(),
121                    headers: Default::default(),
122                }],
123                Compression::Lz4,
124            )
125            .await
126            .context(ProduceRecordSnafu { topic })?;
127        debug!("Appended a noop record to topic {}", topic);
128
129        Ok(())
130    }
131
132    /// Creates topics in Kafka.
133    pub async fn create_topics(&self, topics: &[String]) -> Result<()> {
134        let tasks = topics
135            .iter()
136            .map(|topic| async { self.create_topic(topic, &self.client).await })
137            .collect::<Vec<_>>();
138        futures::future::try_join_all(tasks).await.map(|_| ())
139    }
140
141    /// Prepares topics in Kafka.
142    ///
143    /// It appends a noop record to each topic if the topic is empty.
144    pub async fn prepare_topics(&self, topics: &[String]) -> Result<()> {
145        // Try to create missing topics.
146        let tasks = topics
147            .iter()
148            .map(|topic| async { self.prepare_topic(topic).await })
149            .collect::<Vec<_>>();
150        futures::future::try_join_all(tasks).await.map(|_| ())
151    }
152
153    fn is_topic_already_exist_err(e: &RsKafkaError) -> bool {
154        matches!(
155            e,
156            &RsKafkaError::ServerError {
157                protocol_error: TopicAlreadyExists,
158                ..
159            }
160        )
161    }
162}
163
164#[cfg(test)]
165impl KafkaTopicCreator {
166    pub async fn delete_topics(&self, topics: &[String]) -> Result<()> {
167        let tasks = topics
168            .iter()
169            .map(|topic| async { self.delete_topic(topic, &self.client).await })
170            .collect::<Vec<_>>();
171        futures::future::try_join_all(tasks).await.map(|_| ())
172    }
173
174    async fn delete_topic(&self, topic: &String, client: &Client) -> Result<()> {
175        let controller = client
176            .controller_client()
177            .context(BuildKafkaCtrlClientSnafu)?;
178        match controller.delete_topic(topic, 10).await {
179            Ok(_) => {
180                info!("Successfully deleted topic {}", topic);
181                Ok(())
182            }
183            Err(e) => {
184                if Self::is_unknown_topic_err(&e) {
185                    info!("The topic {} does not exist", topic);
186                    Ok(())
187                } else {
188                    panic!("Failed to delete a topic {}, error: {}", topic, e);
189                }
190            }
191        }
192    }
193
194    fn is_unknown_topic_err(e: &RsKafkaError) -> bool {
195        matches!(
196            e,
197            &RsKafkaError::ServerError {
198                protocol_error: rskafka::client::error::ProtocolError::UnknownTopicOrPartition,
199                ..
200            }
201        )
202    }
203
204    pub async fn get_partition_client(&self, topic: &str) -> PartitionClient {
205        self.partition_client(topic).await.unwrap()
206    }
207}
208/// Builds a kafka [Client](rskafka::client::Client).
209pub async fn build_kafka_client(connection: &KafkaConnectionConfig) -> Result<Client> {
210    // Builds an kafka controller client for creating topics.
211    let mut builder = ClientBuilder::new(connection.broker_endpoints.clone())
212        .backoff_config(DEFAULT_BACKOFF_CONFIG);
213    if let Some(sasl) = &connection.sasl {
214        builder = builder.sasl_config(sasl.config.clone().into_sasl_config());
215    };
216    if let Some(tls) = &connection.tls {
217        builder = builder.tls_config(tls.to_tls_config().await.context(TlsConfigSnafu)?)
218    };
219    builder
220        .build()
221        .await
222        .with_context(|_| BuildKafkaClientSnafu {
223            broker_endpoints: connection.broker_endpoints.clone(),
224        })
225}
226
227/// Builds a [KafkaTopicCreator].
228pub async fn build_kafka_topic_creator(
229    connection: &KafkaConnectionConfig,
230    kafka_topic: &KafkaTopicConfig,
231) -> Result<KafkaTopicCreator> {
232    let client = build_kafka_client(connection).await?;
233    Ok(KafkaTopicCreator {
234        client,
235        num_partitions: kafka_topic.num_partitions,
236        replication_factor: kafka_topic.replication_factor,
237        create_topic_timeout: kafka_topic.create_topic_timeout.as_millis() as i32,
238    })
239}
240
241#[cfg(test)]
242mod tests {
243    use common_wal::config::kafka::common::{KafkaConnectionConfig, KafkaTopicConfig};
244    use common_wal::maybe_skip_kafka_integration_test;
245    use common_wal::test_util::get_kafka_endpoints;
246
247    use super::*;
248
249    async fn test_topic_creator(broker_endpoints: Vec<String>) -> KafkaTopicCreator {
250        let connection = KafkaConnectionConfig {
251            broker_endpoints,
252            ..Default::default()
253        };
254        let kafka_topic = KafkaTopicConfig::default();
255
256        build_kafka_topic_creator(&connection, &kafka_topic)
257            .await
258            .unwrap()
259    }
260
261    async fn append_records(partition_client: &PartitionClient, num_records: usize) -> Result<()> {
262        for i in 0..num_records {
263            partition_client
264                .produce(
265                    vec![Record {
266                        key: Some(b"test".to_vec()),
267                        value: Some(format!("test {}", i).as_bytes().to_vec()),
268                        timestamp: chrono::Utc::now(),
269                        headers: Default::default(),
270                    }],
271                    Compression::Lz4,
272                )
273                .await
274                .unwrap();
275        }
276        Ok(())
277    }
278
279    #[tokio::test]
280    async fn test_append_noop_record_to_empty_topic() {
281        common_telemetry::init_default_ut_logging();
282        maybe_skip_kafka_integration_test!();
283        let prefix = "append_noop_record_to_empty_topic";
284        let creator = test_topic_creator(get_kafka_endpoints()).await;
285
286        let topic = format!("{}{}", prefix, "0");
287        // Clean up the topics before test
288        creator.delete_topics(&[topic.to_string()]).await.unwrap();
289        creator.create_topics(&[topic.to_string()]).await.unwrap();
290
291        let partition_client = creator.partition_client(&topic).await.unwrap();
292        let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap();
293        assert_eq!(end_offset, 0);
294
295        // The topic is not empty, so no noop record is appended.
296        creator
297            .append_noop_record(&topic, &partition_client)
298            .await
299            .unwrap();
300        let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap();
301        assert_eq!(end_offset, 1);
302    }
303
304    #[tokio::test]
305    async fn test_append_noop_record_to_non_empty_topic() {
306        common_telemetry::init_default_ut_logging();
307        maybe_skip_kafka_integration_test!();
308        let prefix = "append_noop_record_to_non_empty_topic";
309        let creator = test_topic_creator(get_kafka_endpoints()).await;
310
311        let topic = format!("{}{}", prefix, "0");
312        // Clean up the topics before test
313        creator.delete_topics(&[topic.to_string()]).await.unwrap();
314
315        creator.create_topics(&[topic.to_string()]).await.unwrap();
316        let partition_client = creator.partition_client(&topic).await.unwrap();
317        append_records(&partition_client, 2).await.unwrap();
318
319        let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap();
320        assert_eq!(end_offset, 2);
321
322        // The topic is not empty, so no noop record is appended.
323        creator
324            .append_noop_record(&topic, &partition_client)
325            .await
326            .unwrap();
327        let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap();
328        assert_eq!(end_offset, 2);
329    }
330
331    #[tokio::test]
332    async fn test_create_topic() {
333        common_telemetry::init_default_ut_logging();
334        maybe_skip_kafka_integration_test!();
335        let prefix = "create_topic";
336        let creator = test_topic_creator(get_kafka_endpoints()).await;
337
338        let topic = format!("{}{}", prefix, "0");
339        // Clean up the topics before test
340        creator.delete_topics(&[topic.to_string()]).await.unwrap();
341
342        creator.create_topics(&[topic.to_string()]).await.unwrap();
343        // Should be ok
344        creator.create_topics(&[topic.to_string()]).await.unwrap();
345
346        let partition_client = creator.partition_client(&topic).await.unwrap();
347        let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap();
348        assert_eq!(end_offset, 0);
349    }
350
351    #[tokio::test]
352    async fn test_prepare_topic() {
353        common_telemetry::init_default_ut_logging();
354        maybe_skip_kafka_integration_test!();
355        let prefix = "prepare_topic";
356        let creator = test_topic_creator(get_kafka_endpoints()).await;
357
358        let topic = format!("{}{}", prefix, "0");
359        // Clean up the topics before test
360        creator.delete_topics(&[topic.to_string()]).await.unwrap();
361
362        creator.create_topics(&[topic.to_string()]).await.unwrap();
363        creator.prepare_topic(&topic).await.unwrap();
364
365        let partition_client = creator.partition_client(&topic).await.unwrap();
366        let start_offset = partition_client
367            .get_offset(OffsetAt::Earliest)
368            .await
369            .unwrap();
370        assert_eq!(start_offset, 0);
371
372        let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap();
373        assert_eq!(end_offset, 1);
374    }
375
376    #[tokio::test]
377    async fn test_prepare_topic_with_stale_records_without_pruning() {
378        common_telemetry::init_default_ut_logging();
379        maybe_skip_kafka_integration_test!();
380
381        let prefix = "prepare_topic_with_stale_records_without_pruning";
382        let creator = test_topic_creator(get_kafka_endpoints()).await;
383
384        let topic = format!("{}{}", prefix, "0");
385        // Clean up the topics before test
386        creator.delete_topics(&[topic.to_string()]).await.unwrap();
387
388        creator.create_topics(&[topic.to_string()]).await.unwrap();
389        let partition_client = creator.partition_client(&topic).await.unwrap();
390        append_records(&partition_client, 10).await.unwrap();
391
392        creator.prepare_topic(&topic).await.unwrap();
393
394        let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap();
395        assert_eq!(end_offset, 10);
396        let start_offset = partition_client
397            .get_offset(OffsetAt::Earliest)
398            .await
399            .unwrap();
400        assert_eq!(start_offset, 0);
401    }
402}