common_meta/wal_options_allocator/
topic_creator.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use common_telemetry::{debug, error, info};
16use common_wal::config::kafka::common::{
17    KafkaConnectionConfig, KafkaTopicConfig, DEFAULT_BACKOFF_CONFIG,
18};
19use rskafka::client::error::Error as RsKafkaError;
20use rskafka::client::error::ProtocolError::TopicAlreadyExists;
21use rskafka::client::partition::{Compression, OffsetAt, PartitionClient, UnknownTopicHandling};
22use rskafka::client::{Client, ClientBuilder};
23use rskafka::record::Record;
24use snafu::ResultExt;
25
26use crate::error::{
27    BuildKafkaClientSnafu, BuildKafkaCtrlClientSnafu, CreateKafkaWalTopicSnafu,
28    KafkaGetOffsetSnafu, KafkaPartitionClientSnafu, ProduceRecordSnafu, ResolveKafkaEndpointSnafu,
29    Result, TlsConfigSnafu,
30};
31
32// Each topic only has one partition for now.
33// The `DEFAULT_PARTITION` refers to the index of the partition.
34const DEFAULT_PARTITION: i32 = 0;
35
36/// Creates topics in kafka.
37pub struct KafkaTopicCreator {
38    client: Client,
39    /// The number of partitions per topic.
40    num_partitions: i32,
41    /// The replication factor of each topic.
42    replication_factor: i16,
43    /// The timeout of topic creation in milliseconds.
44    create_topic_timeout: i32,
45}
46
47impl KafkaTopicCreator {
48    pub fn client(&self) -> &Client {
49        &self.client
50    }
51
52    async fn create_topic(&self, topic: &String, client: &Client) -> Result<()> {
53        let controller = client
54            .controller_client()
55            .context(BuildKafkaCtrlClientSnafu)?;
56        match controller
57            .create_topic(
58                topic,
59                self.num_partitions,
60                self.replication_factor,
61                self.create_topic_timeout,
62            )
63            .await
64        {
65            Ok(_) => {
66                info!("Successfully created topic {}", topic);
67                Ok(())
68            }
69            Err(e) => {
70                if Self::is_topic_already_exist_err(&e) {
71                    info!("The topic {} already exists", topic);
72                    Ok(())
73                } else {
74                    error!(e; "Failed to create a topic {}", topic);
75                    Err(e).context(CreateKafkaWalTopicSnafu)
76                }
77            }
78        }
79    }
80
81    async fn prepare_topic(&self, topic: &String) -> Result<()> {
82        let partition_client = self.partition_client(topic).await?;
83        self.append_noop_record(topic, &partition_client).await?;
84        Ok(())
85    }
86
87    /// Creates a [PartitionClient] for the given topic.
88    async fn partition_client(&self, topic: &str) -> Result<PartitionClient> {
89        self.client
90            .partition_client(topic, DEFAULT_PARTITION, UnknownTopicHandling::Retry)
91            .await
92            .context(KafkaPartitionClientSnafu {
93                topic,
94                partition: DEFAULT_PARTITION,
95            })
96    }
97
98    /// Appends a noop record to the topic.
99    /// It only appends a noop record if the topic is empty.
100    async fn append_noop_record(
101        &self,
102        topic: &String,
103        partition_client: &PartitionClient,
104    ) -> Result<()> {
105        let end_offset = partition_client
106            .get_offset(OffsetAt::Latest)
107            .await
108            .context(KafkaGetOffsetSnafu {
109                topic: topic.to_string(),
110                partition: DEFAULT_PARTITION,
111            })?;
112        if end_offset > 0 {
113            return Ok(());
114        }
115
116        partition_client
117            .produce(
118                vec![Record {
119                    key: None,
120                    value: None,
121                    timestamp: chrono::Utc::now(),
122                    headers: Default::default(),
123                }],
124                Compression::Lz4,
125            )
126            .await
127            .context(ProduceRecordSnafu { topic })?;
128        debug!("Appended a noop record to topic {}", topic);
129
130        Ok(())
131    }
132
133    /// Creates topics in Kafka.
134    pub async fn create_topics(&self, topics: &[String]) -> Result<()> {
135        let tasks = topics
136            .iter()
137            .map(|topic| async { self.create_topic(topic, &self.client).await })
138            .collect::<Vec<_>>();
139        futures::future::try_join_all(tasks).await.map(|_| ())
140    }
141
142    /// Prepares topics in Kafka.
143    ///
144    /// It appends a noop record to each topic if the topic is empty.
145    pub async fn prepare_topics(&self, topics: &[String]) -> Result<()> {
146        // Try to create missing topics.
147        let tasks = topics
148            .iter()
149            .map(|topic| async { self.prepare_topic(topic).await })
150            .collect::<Vec<_>>();
151        futures::future::try_join_all(tasks).await.map(|_| ())
152    }
153
154    fn is_topic_already_exist_err(e: &RsKafkaError) -> bool {
155        matches!(
156            e,
157            &RsKafkaError::ServerError {
158                protocol_error: TopicAlreadyExists,
159                ..
160            }
161        )
162    }
163}
164
165#[cfg(test)]
166impl KafkaTopicCreator {
167    pub async fn delete_topics(&self, topics: &[String]) -> Result<()> {
168        let tasks = topics
169            .iter()
170            .map(|topic| async { self.delete_topic(topic, &self.client).await })
171            .collect::<Vec<_>>();
172        futures::future::try_join_all(tasks).await.map(|_| ())
173    }
174
175    async fn delete_topic(&self, topic: &String, client: &Client) -> Result<()> {
176        let controller = client
177            .controller_client()
178            .context(BuildKafkaCtrlClientSnafu)?;
179        match controller.delete_topic(topic, 10).await {
180            Ok(_) => {
181                info!("Successfully deleted topic {}", topic);
182                Ok(())
183            }
184            Err(e) => {
185                if Self::is_unknown_topic_err(&e) {
186                    info!("The topic {} does not exist", topic);
187                    Ok(())
188                } else {
189                    panic!("Failed to delete a topic {}, error: {}", topic, e);
190                }
191            }
192        }
193    }
194
195    fn is_unknown_topic_err(e: &RsKafkaError) -> bool {
196        matches!(
197            e,
198            &RsKafkaError::ServerError {
199                protocol_error: rskafka::client::error::ProtocolError::UnknownTopicOrPartition,
200                ..
201            }
202        )
203    }
204
205    pub async fn get_partition_client(&self, topic: &str) -> PartitionClient {
206        self.partition_client(topic).await.unwrap()
207    }
208}
209/// Builds a kafka [Client](rskafka::client::Client).
210pub async fn build_kafka_client(connection: &KafkaConnectionConfig) -> Result<Client> {
211    // Builds an kafka controller client for creating topics.
212    let broker_endpoints = common_wal::resolve_to_ipv4(&connection.broker_endpoints)
213        .await
214        .context(ResolveKafkaEndpointSnafu)?;
215    let mut builder = ClientBuilder::new(broker_endpoints).backoff_config(DEFAULT_BACKOFF_CONFIG);
216    if let Some(sasl) = &connection.sasl {
217        builder = builder.sasl_config(sasl.config.clone().into_sasl_config());
218    };
219    if let Some(tls) = &connection.tls {
220        builder = builder.tls_config(tls.to_tls_config().await.context(TlsConfigSnafu)?)
221    };
222    builder
223        .build()
224        .await
225        .with_context(|_| BuildKafkaClientSnafu {
226            broker_endpoints: connection.broker_endpoints.clone(),
227        })
228}
229
230/// Builds a [KafkaTopicCreator].
231pub async fn build_kafka_topic_creator(
232    connection: &KafkaConnectionConfig,
233    kafka_topic: &KafkaTopicConfig,
234) -> Result<KafkaTopicCreator> {
235    let client = build_kafka_client(connection).await?;
236    Ok(KafkaTopicCreator {
237        client,
238        num_partitions: kafka_topic.num_partitions,
239        replication_factor: kafka_topic.replication_factor,
240        create_topic_timeout: kafka_topic.create_topic_timeout.as_millis() as i32,
241    })
242}
243
244#[cfg(test)]
245mod tests {
246    use common_wal::config::kafka::common::{KafkaConnectionConfig, KafkaTopicConfig};
247    use common_wal::maybe_skip_kafka_integration_test;
248    use common_wal::test_util::get_kafka_endpoints;
249
250    use super::*;
251
252    async fn test_topic_creator(broker_endpoints: Vec<String>) -> KafkaTopicCreator {
253        let connection = KafkaConnectionConfig {
254            broker_endpoints,
255            ..Default::default()
256        };
257        let kafka_topic = KafkaTopicConfig::default();
258
259        build_kafka_topic_creator(&connection, &kafka_topic)
260            .await
261            .unwrap()
262    }
263
264    async fn append_records(partition_client: &PartitionClient, num_records: usize) -> Result<()> {
265        for i in 0..num_records {
266            partition_client
267                .produce(
268                    vec![Record {
269                        key: Some(b"test".to_vec()),
270                        value: Some(format!("test {}", i).as_bytes().to_vec()),
271                        timestamp: chrono::Utc::now(),
272                        headers: Default::default(),
273                    }],
274                    Compression::Lz4,
275                )
276                .await
277                .unwrap();
278        }
279        Ok(())
280    }
281
282    #[tokio::test]
283    async fn test_append_noop_record_to_empty_topic() {
284        common_telemetry::init_default_ut_logging();
285        maybe_skip_kafka_integration_test!();
286        let prefix = "append_noop_record_to_empty_topic";
287        let creator = test_topic_creator(get_kafka_endpoints()).await;
288
289        let topic = format!("{}{}", prefix, "0");
290        // Clean up the topics before test
291        creator.delete_topics(&[topic.to_string()]).await.unwrap();
292        creator.create_topics(&[topic.to_string()]).await.unwrap();
293
294        let partition_client = creator.partition_client(&topic).await.unwrap();
295        let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap();
296        assert_eq!(end_offset, 0);
297
298        // The topic is not empty, so no noop record is appended.
299        creator
300            .append_noop_record(&topic, &partition_client)
301            .await
302            .unwrap();
303        let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap();
304        assert_eq!(end_offset, 1);
305    }
306
307    #[tokio::test]
308    async fn test_append_noop_record_to_non_empty_topic() {
309        common_telemetry::init_default_ut_logging();
310        maybe_skip_kafka_integration_test!();
311        let prefix = "append_noop_record_to_non_empty_topic";
312        let creator = test_topic_creator(get_kafka_endpoints()).await;
313
314        let topic = format!("{}{}", prefix, "0");
315        // Clean up the topics before test
316        creator.delete_topics(&[topic.to_string()]).await.unwrap();
317
318        creator.create_topics(&[topic.to_string()]).await.unwrap();
319        let partition_client = creator.partition_client(&topic).await.unwrap();
320        append_records(&partition_client, 2).await.unwrap();
321
322        let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap();
323        assert_eq!(end_offset, 2);
324
325        // The topic is not empty, so no noop record is appended.
326        creator
327            .append_noop_record(&topic, &partition_client)
328            .await
329            .unwrap();
330        let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap();
331        assert_eq!(end_offset, 2);
332    }
333
334    #[tokio::test]
335    async fn test_create_topic() {
336        common_telemetry::init_default_ut_logging();
337        maybe_skip_kafka_integration_test!();
338        let prefix = "create_topic";
339        let creator = test_topic_creator(get_kafka_endpoints()).await;
340
341        let topic = format!("{}{}", prefix, "0");
342        // Clean up the topics before test
343        creator.delete_topics(&[topic.to_string()]).await.unwrap();
344
345        creator.create_topics(&[topic.to_string()]).await.unwrap();
346        // Should be ok
347        creator.create_topics(&[topic.to_string()]).await.unwrap();
348
349        let partition_client = creator.partition_client(&topic).await.unwrap();
350        let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap();
351        assert_eq!(end_offset, 0);
352    }
353
354    #[tokio::test]
355    async fn test_prepare_topic() {
356        common_telemetry::init_default_ut_logging();
357        maybe_skip_kafka_integration_test!();
358        let prefix = "prepare_topic";
359        let creator = test_topic_creator(get_kafka_endpoints()).await;
360
361        let topic = format!("{}{}", prefix, "0");
362        // Clean up the topics before test
363        creator.delete_topics(&[topic.to_string()]).await.unwrap();
364
365        creator.create_topics(&[topic.to_string()]).await.unwrap();
366        creator.prepare_topic(&topic).await.unwrap();
367
368        let partition_client = creator.partition_client(&topic).await.unwrap();
369        let start_offset = partition_client
370            .get_offset(OffsetAt::Earliest)
371            .await
372            .unwrap();
373        assert_eq!(start_offset, 0);
374
375        let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap();
376        assert_eq!(end_offset, 1);
377    }
378
379    #[tokio::test]
380    async fn test_prepare_topic_with_stale_records_without_pruning() {
381        common_telemetry::init_default_ut_logging();
382        maybe_skip_kafka_integration_test!();
383
384        let prefix = "prepare_topic_with_stale_records_without_pruning";
385        let creator = test_topic_creator(get_kafka_endpoints()).await;
386
387        let topic = format!("{}{}", prefix, "0");
388        // Clean up the topics before test
389        creator.delete_topics(&[topic.to_string()]).await.unwrap();
390
391        creator.create_topics(&[topic.to_string()]).await.unwrap();
392        let partition_client = creator.partition_client(&topic).await.unwrap();
393        append_records(&partition_client, 10).await.unwrap();
394
395        creator.prepare_topic(&topic).await.unwrap();
396
397        let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap();
398        assert_eq!(end_offset, 10);
399        let start_offset = partition_client
400            .get_offset(OffsetAt::Earliest)
401            .await
402            .unwrap();
403        assert_eq!(start_offset, 0);
404    }
405}