1use std::collections::HashMap;
30use std::fmt::{self, Display};
31
32use common_wal::options::WalOptions;
33use serde::{Deserialize, Serialize};
34use snafu::OptionExt;
35use store_api::storage::{RegionId, RegionNumber};
36use table::metadata::TableId;
37
38use crate::ddl::utils::parse_region_wal_options;
39use crate::error::{Error, InvalidMetadataSnafu, Result};
40use crate::key::{MetadataKey, TOPIC_REGION_PATTERN, TOPIC_REGION_PREFIX};
41use crate::kv_backend::txn::{Txn, TxnOp};
42use crate::kv_backend::KvBackendRef;
43use crate::rpc::store::{BatchDeleteRequest, BatchPutRequest, PutRequest, RangeRequest};
44use crate::rpc::KeyValue;
45
46#[derive(Debug, Clone, PartialEq)]
49pub struct TopicRegionKey<'a> {
50 pub region_id: RegionId,
51 pub topic: &'a str,
52}
53
54#[derive(Debug, Serialize, Deserialize)]
55pub struct TopicRegionValue;
56
57impl<'a> TopicRegionKey<'a> {
58 pub fn new(region_id: RegionId, topic: &'a str) -> Self {
59 Self { region_id, topic }
60 }
61
62 pub fn range_topic_key(topic: &str) -> String {
63 format!("{}/{}/", TOPIC_REGION_PREFIX, topic)
64 }
65}
66
67impl<'a> MetadataKey<'a, TopicRegionKey<'a>> for TopicRegionKey<'a> {
68 fn to_bytes(&self) -> Vec<u8> {
69 self.to_string().into_bytes()
70 }
71
72 fn from_bytes(bytes: &'a [u8]) -> Result<TopicRegionKey<'a>> {
73 let key = std::str::from_utf8(bytes).map_err(|e| {
74 InvalidMetadataSnafu {
75 err_msg: format!(
76 "TopicRegionKey '{}' is not a valid UTF8 string: {e}",
77 String::from_utf8_lossy(bytes)
78 ),
79 }
80 .build()
81 })?;
82 TopicRegionKey::try_from(key)
83 }
84}
85
86impl Display for TopicRegionKey<'_> {
87 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
88 write!(
89 f,
90 "{}{}",
91 Self::range_topic_key(self.topic),
92 self.region_id.as_u64()
93 )
94 }
95}
96
97impl<'a> TryFrom<&'a str> for TopicRegionKey<'a> {
98 type Error = Error;
99
100 fn try_from(value: &'a str) -> Result<TopicRegionKey<'a>> {
102 let captures = TOPIC_REGION_PATTERN
103 .captures(value)
104 .context(InvalidMetadataSnafu {
105 err_msg: format!("Invalid TopicRegionKey: {}", value),
106 })?;
107 let topic = captures.get(1).map(|m| m.as_str()).unwrap();
108 let region_id = captures[2].parse::<u64>().map_err(|_| {
109 InvalidMetadataSnafu {
110 err_msg: format!("Invalid region id in TopicRegionKey: {}", value),
111 }
112 .build()
113 })?;
114 Ok(TopicRegionKey {
115 region_id: RegionId::from_u64(region_id),
116 topic,
117 })
118 }
119}
120
121fn topic_region_decoder(value: &KeyValue) -> Result<TopicRegionKey<'_>> {
122 let key = TopicRegionKey::from_bytes(&value.key)?;
123 Ok(key)
124}
125
126pub struct TopicRegionManager {
128 kv_backend: KvBackendRef,
129}
130
131impl TopicRegionManager {
132 pub fn new(kv_backend: KvBackendRef) -> Self {
133 Self { kv_backend }
134 }
135
136 pub async fn put(&self, key: TopicRegionKey<'_>) -> Result<()> {
137 let put_req = PutRequest {
138 key: key.to_bytes(),
139 value: vec![],
140 prev_kv: false,
141 };
142 self.kv_backend.put(put_req).await?;
143 Ok(())
144 }
145
146 pub async fn batch_put(&self, keys: Vec<TopicRegionKey<'_>>) -> Result<()> {
147 let req = BatchPutRequest {
148 kvs: keys
149 .into_iter()
150 .map(|key| KeyValue {
151 key: key.to_bytes(),
152 value: vec![],
153 })
154 .collect(),
155 prev_kv: false,
156 };
157 self.kv_backend.batch_put(req).await?;
158 Ok(())
159 }
160
161 pub fn build_create_txn(
162 &self,
163 table_id: TableId,
164 region_wal_options: &HashMap<RegionNumber, String>,
165 ) -> Result<Txn> {
166 let region_wal_options = parse_region_wal_options(region_wal_options)?;
167 let topic_region_mapping = self.get_topic_region_mapping(table_id, ®ion_wal_options);
168 let topic_region_keys = topic_region_mapping
169 .iter()
170 .map(|(topic, region_id)| TopicRegionKey::new(*topic, region_id))
171 .collect::<Vec<_>>();
172 let operations = topic_region_keys
173 .into_iter()
174 .map(|key| TxnOp::Put(key.to_bytes(), vec![]))
175 .collect::<Vec<_>>();
176 Ok(Txn::new().and_then(operations))
177 }
178
179 pub async fn regions(&self, topic: &str) -> Result<Vec<RegionId>> {
181 let prefix = TopicRegionKey::range_topic_key(topic);
182 let req = RangeRequest::new().with_prefix(prefix.as_bytes());
183 let resp = self.kv_backend.range(req).await?;
184 let region_ids = resp
185 .kvs
186 .iter()
187 .map(topic_region_decoder)
188 .collect::<Result<Vec<_>>>()?;
189 Ok(region_ids.iter().map(|key| key.region_id).collect())
190 }
191
192 pub async fn delete(&self, key: TopicRegionKey<'_>) -> Result<()> {
193 let raw_key = key.to_bytes();
194 self.kv_backend.delete(&raw_key, false).await?;
195 Ok(())
196 }
197
198 pub async fn batch_delete(&self, keys: Vec<TopicRegionKey<'_>>) -> Result<()> {
199 let raw_keys = keys.iter().map(|key| key.to_bytes()).collect::<Vec<_>>();
200 let req = BatchDeleteRequest {
201 keys: raw_keys,
202 prev_kv: false,
203 };
204 self.kv_backend.batch_delete(req).await?;
205 Ok(())
206 }
207
208 pub fn get_topic_region_mapping<'a>(
214 &self,
215 table_id: TableId,
216 region_wal_options: &'a HashMap<RegionNumber, WalOptions>,
217 ) -> Vec<(RegionId, &'a str)> {
218 region_wal_options
219 .keys()
220 .filter_map(
221 |region_number| match region_wal_options.get(region_number) {
222 Some(WalOptions::Kafka(kafka)) => {
223 let region_id = RegionId::new(table_id, *region_number);
224 Some((region_id, kafka.topic.as_str()))
225 }
226 Some(WalOptions::RaftEngine) => None,
227 Some(WalOptions::Noop) => None,
228 None => None,
229 },
230 )
231 .collect::<Vec<_>>()
232 }
233}
234
235#[cfg(test)]
236mod tests {
237 use std::sync::Arc;
238
239 use common_wal::options::KafkaWalOptions;
240
241 use super::*;
242 use crate::kv_backend::memory::MemoryKvBackend;
243
244 #[tokio::test]
245 async fn test_topic_region_manager() {
246 let kv_backend = Arc::new(MemoryKvBackend::default());
247 let manager = TopicRegionManager::new(kv_backend.clone());
248
249 let topics = (0..16).map(|i| format!("topic_{}", i)).collect::<Vec<_>>();
250 let keys = (0..64)
251 .map(|i| TopicRegionKey::new(RegionId::from_u64(i), &topics[(i % 16) as usize]))
252 .collect::<Vec<_>>();
253
254 manager.batch_put(keys.clone()).await.unwrap();
255
256 let mut key_values = manager.regions(&topics[0]).await.unwrap();
257 let expected = keys
258 .iter()
259 .filter_map(|key| {
260 if key.topic == topics[0] {
261 Some(key.region_id)
262 } else {
263 None
264 }
265 })
266 .collect::<Vec<_>>();
267 key_values.sort_by_key(|id| id.as_u64());
268 assert_eq!(key_values, expected);
269
270 let key = TopicRegionKey::new(RegionId::from_u64(0), "topic_0");
271 manager.delete(key.clone()).await.unwrap();
272 let mut key_values = manager.regions(&topics[0]).await.unwrap();
273 let expected = keys
274 .iter()
275 .filter_map(|key| {
276 if key.topic == topics[0] && key.region_id != RegionId::from_u64(0) {
277 Some(key.region_id)
278 } else {
279 None
280 }
281 })
282 .collect::<Vec<_>>();
283 key_values.sort_by_key(|id| id.as_u64());
284 assert_eq!(key_values, expected);
285 }
286
287 #[test]
288 fn test_topic_region_map() {
289 let kv_backend = Arc::new(MemoryKvBackend::default());
290 let manager = TopicRegionManager::new(kv_backend.clone());
291
292 let table_id = 1;
293 let region_wal_options = (0..64)
294 .map(|i| {
295 let region_number = i;
296 let wal_options = if i % 2 == 0 {
297 WalOptions::Kafka(KafkaWalOptions {
298 topic: format!("topic_{}", i),
299 })
300 } else {
301 WalOptions::RaftEngine
302 };
303 (region_number, serde_json::to_string(&wal_options).unwrap())
304 })
305 .collect::<HashMap<_, _>>();
306
307 let region_wal_options = parse_region_wal_options(®ion_wal_options).unwrap();
308 let mut topic_region_mapping =
309 manager.get_topic_region_mapping(table_id, ®ion_wal_options);
310 let mut expected = (0..64)
311 .filter_map(|i| {
312 if i % 2 == 0 {
313 Some((RegionId::new(table_id, i), format!("topic_{}", i)))
314 } else {
315 None
316 }
317 })
318 .collect::<Vec<_>>();
319 topic_region_mapping.sort_by_key(|(region_id, _)| region_id.as_u64());
320 let topic_region_map = topic_region_mapping
321 .iter()
322 .map(|(region_id, topic)| (*region_id, topic.to_string()))
323 .collect::<Vec<_>>();
324 expected.sort_by_key(|(region_id, _)| region_id.as_u64());
325 assert_eq!(topic_region_map, expected);
326 }
327}