common_meta/wal_options_allocator/
topic_manager.rsuse std::collections::HashSet;
use crate::error::Result;
use crate::key::topic_name::{TopicNameKey, TopicNameManager};
use crate::kv_backend::KvBackendRef;
pub struct KafkaTopicManager {
topic_name_manager: TopicNameManager,
}
impl KafkaTopicManager {
pub fn new(kv_backend: KvBackendRef) -> Self {
Self {
topic_name_manager: TopicNameManager::new(kv_backend.clone()),
}
}
async fn restore_topics(&self) -> Result<Vec<String>> {
self.topic_name_manager.update_legacy_topics().await?;
let topics = self.topic_name_manager.range().await?;
Ok(topics)
}
pub async fn get_topics_to_create<'a>(
&self,
all_topics: &'a [String],
) -> Result<Vec<&'a String>> {
let existing_topics = self.restore_topics().await?;
let existing_topic_set = existing_topics.iter().collect::<HashSet<_>>();
let mut topics_to_create = Vec::with_capacity(all_topics.len());
for topic in all_topics {
if !existing_topic_set.contains(topic) {
topics_to_create.push(topic);
}
}
Ok(topics_to_create)
}
pub async fn persist_topics(&self, topics: &[String]) -> Result<()> {
self.topic_name_manager
.batch_put(
topics
.iter()
.map(|topic| TopicNameKey::new(topic))
.collect(),
)
.await?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use super::*;
use crate::key::LEGACY_TOPIC_KEY_PREFIX;
use crate::kv_backend::memory::MemoryKvBackend;
use crate::rpc::store::PutRequest;
#[tokio::test]
async fn test_restore_legacy_persisted_topics() {
let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
let topic_kvbackend_manager = KafkaTopicManager::new(kv_backend.clone());
let all_topics = (0..16)
.map(|i| format!("greptimedb_wal_topic_{}", i))
.collect::<Vec<_>>();
let mut topics_to_be_created = topic_kvbackend_manager
.get_topics_to_create(&all_topics)
.await
.unwrap();
topics_to_be_created.sort();
let mut expected = all_topics.iter().collect::<Vec<_>>();
expected.sort();
assert_eq!(expected, topics_to_be_created);
let topics = "[\"greptimedb_wal_topic_0\",\"greptimedb_wal_topic_1\",\"greptimedb_wal_topic_2\",\"greptimedb_wal_topic_3\",\"greptimedb_wal_topic_4\",\"greptimedb_wal_topic_5\",\"greptimedb_wal_topic_6\",\"greptimedb_wal_topic_7\",\"greptimedb_wal_topic_8\",\"greptimedb_wal_topic_9\",\"greptimedb_wal_topic_10\",\"greptimedb_wal_topic_11\",\"greptimedb_wal_topic_12\",\"greptimedb_wal_topic_13\",\"greptimedb_wal_topic_14\",\"greptimedb_wal_topic_15\"]";
let put_req = PutRequest {
key: LEGACY_TOPIC_KEY_PREFIX.as_bytes().to_vec(),
value: topics.as_bytes().to_vec(),
prev_kv: true,
};
let res = kv_backend.put(put_req).await.unwrap();
assert!(res.prev_kv.is_none());
let topics_to_be_created = topic_kvbackend_manager
.get_topics_to_create(&all_topics)
.await
.unwrap();
assert!(topics_to_be_created.is_empty());
let legacy_topics = kv_backend
.get(LEGACY_TOPIC_KEY_PREFIX.as_bytes())
.await
.unwrap();
assert!(legacy_topics.is_none());
let mut restored_topics = topic_kvbackend_manager.restore_topics().await.unwrap();
restored_topics.sort();
let mut expected = all_topics.clone();
expected.sort();
assert_eq!(expected, restored_topics);
}
#[tokio::test]
async fn test_restore_persisted_topics() {
let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
let topic_name_prefix = "greptimedb_wal_topic";
let num_topics = 16;
let all_topics = (0..num_topics)
.map(|i| format!("{}_{}", topic_name_prefix, i))
.collect::<Vec<_>>();
let topic_kvbackend_manager = KafkaTopicManager::new(kv_backend);
let mut topics_to_be_created = topic_kvbackend_manager
.get_topics_to_create(&all_topics)
.await
.unwrap();
topics_to_be_created.sort();
let mut expected = all_topics.iter().collect::<Vec<_>>();
expected.sort();
assert_eq!(expected, topics_to_be_created);
topic_kvbackend_manager
.persist_topics(&all_topics)
.await
.unwrap();
let topics_to_be_created = topic_kvbackend_manager
.get_topics_to_create(&all_topics)
.await
.unwrap();
assert!(topics_to_be_created.is_empty());
}
}