common_meta/wal_options_allocator/
topic_creator.rsuse common_telemetry::{error, info};
use common_wal::config::kafka::MetasrvKafkaConfig;
use rskafka::client::error::Error as RsKafkaError;
use rskafka::client::error::ProtocolError::TopicAlreadyExists;
use rskafka::client::partition::{Compression, UnknownTopicHandling};
use rskafka::client::{Client, ClientBuilder};
use rskafka::record::Record;
use rskafka::BackoffConfig;
use snafu::ResultExt;
use crate::error::{
BuildKafkaClientSnafu, BuildKafkaCtrlClientSnafu, BuildKafkaPartitionClientSnafu,
CreateKafkaWalTopicSnafu, ProduceRecordSnafu, ResolveKafkaEndpointSnafu, Result,
TlsConfigSnafu,
};
const DEFAULT_PARTITION: i32 = 0;
pub struct KafkaTopicCreator {
client: Client,
num_partitions: i32,
replication_factor: i16,
create_topic_timeout: i32,
}
impl KafkaTopicCreator {
async fn create_topic(&self, topic: &String, client: &Client) -> Result<()> {
let controller = client
.controller_client()
.context(BuildKafkaCtrlClientSnafu)?;
match controller
.create_topic(
topic,
self.num_partitions,
self.replication_factor,
self.create_topic_timeout,
)
.await
{
Ok(_) => {
info!("Successfully created topic {}", topic);
Ok(())
}
Err(e) => {
if Self::is_topic_already_exist_err(&e) {
info!("The topic {} already exists", topic);
Ok(())
} else {
error!("Failed to create a topic {}, error {:?}", topic, e);
Err(e).context(CreateKafkaWalTopicSnafu)
}
}
}
}
async fn append_noop_record(&self, topic: &String, client: &Client) -> Result<()> {
let partition_client = client
.partition_client(topic, DEFAULT_PARTITION, UnknownTopicHandling::Retry)
.await
.context(BuildKafkaPartitionClientSnafu {
topic,
partition: DEFAULT_PARTITION,
})?;
partition_client
.produce(
vec![Record {
key: None,
value: None,
timestamp: chrono::Utc::now(),
headers: Default::default(),
}],
Compression::Lz4,
)
.await
.context(ProduceRecordSnafu { topic })?;
Ok(())
}
pub async fn prepare_topics(&self, topics: &[&String]) -> Result<()> {
let tasks = topics
.iter()
.map(|topic| async {
self.create_topic(topic, &self.client).await?;
self.append_noop_record(topic, &self.client).await?;
Ok(())
})
.collect::<Vec<_>>();
futures::future::try_join_all(tasks).await.map(|_| ())
}
fn is_topic_already_exist_err(e: &RsKafkaError) -> bool {
matches!(
e,
&RsKafkaError::ServerError {
protocol_error: TopicAlreadyExists,
..
}
)
}
}
pub async fn build_kafka_topic_creator(config: &MetasrvKafkaConfig) -> Result<KafkaTopicCreator> {
let backoff_config = BackoffConfig {
init_backoff: config.backoff.init,
max_backoff: config.backoff.max,
base: config.backoff.base as f64,
deadline: config.backoff.deadline,
};
let broker_endpoints = common_wal::resolve_to_ipv4(&config.connection.broker_endpoints)
.await
.context(ResolveKafkaEndpointSnafu)?;
let mut builder = ClientBuilder::new(broker_endpoints).backoff_config(backoff_config);
if let Some(sasl) = &config.connection.sasl {
builder = builder.sasl_config(sasl.config.clone().into_sasl_config());
};
if let Some(tls) = &config.connection.tls {
builder = builder.tls_config(tls.to_tls_config().await.context(TlsConfigSnafu)?)
};
let client = builder
.build()
.await
.with_context(|_| BuildKafkaClientSnafu {
broker_endpoints: config.connection.broker_endpoints.clone(),
})?;
Ok(KafkaTopicCreator {
client,
num_partitions: config.kafka_topic.num_partitions,
replication_factor: config.kafka_topic.replication_factor,
create_topic_timeout: config.kafka_topic.create_topic_timeout.as_millis() as i32,
})
}