1use std::collections::HashMap;
16use std::fmt::{self, Display};
17
18use common_wal::options::WalOptions;
19use serde::{Deserialize, Serialize};
20use snafu::OptionExt;
21use store_api::storage::{RegionId, RegionNumber};
22use table::metadata::TableId;
23
24use crate::ddl::utils::parse_region_wal_options;
25use crate::error::{Error, InvalidMetadataSnafu, Result};
26use crate::key::{MetadataKey, MetadataValue, TOPIC_REGION_PATTERN, TOPIC_REGION_PREFIX};
27use crate::kv_backend::KvBackendRef;
28use crate::kv_backend::txn::{Txn, TxnOp};
29use crate::rpc::KeyValue;
30use crate::rpc::store::{
31 BatchDeleteRequest, BatchGetRequest, BatchPutRequest, PutRequest, RangeRequest,
32};
33
34#[derive(Debug, Clone, PartialEq)]
37pub struct TopicRegionKey<'a> {
38 pub region_id: RegionId,
39 pub topic: &'a str,
40}
41
42#[derive(Debug, Clone, Copy, Serialize, Deserialize, Default)]
44pub struct TopicRegionValue {
45 #[serde(default, skip_serializing_if = "Option::is_none")]
46 pub checkpoint: Option<ReplayCheckpoint>,
47}
48
49#[derive(Debug, Clone, Copy, Serialize, Deserialize, Default, PartialEq, Eq, PartialOrd, Ord)]
50pub struct ReplayCheckpoint {
51 #[serde(default)]
52 pub entry_id: u64,
53 #[serde(default, skip_serializing_if = "Option::is_none")]
54 pub metadata_entry_id: Option<u64>,
55}
56
57impl<'a> TopicRegionKey<'a> {
58 pub fn new(region_id: RegionId, topic: &'a str) -> Self {
59 Self { region_id, topic }
60 }
61
62 pub fn range_topic_key(topic: &str) -> String {
63 format!("{}/{}/", TOPIC_REGION_PREFIX, topic)
64 }
65}
66
67impl<'a> MetadataKey<'a, TopicRegionKey<'a>> for TopicRegionKey<'a> {
68 fn to_bytes(&self) -> Vec<u8> {
69 self.to_string().into_bytes()
70 }
71
72 fn from_bytes(bytes: &'a [u8]) -> Result<TopicRegionKey<'a>> {
73 let key = std::str::from_utf8(bytes).map_err(|e| {
74 InvalidMetadataSnafu {
75 err_msg: format!(
76 "TopicRegionKey '{}' is not a valid UTF8 string: {e}",
77 String::from_utf8_lossy(bytes)
78 ),
79 }
80 .build()
81 })?;
82 TopicRegionKey::try_from(key)
83 }
84}
85
86impl Display for TopicRegionKey<'_> {
87 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
88 write!(
89 f,
90 "{}{}",
91 Self::range_topic_key(self.topic),
92 self.region_id.as_u64()
93 )
94 }
95}
96
97impl<'a> TryFrom<&'a str> for TopicRegionKey<'a> {
98 type Error = Error;
99
100 fn try_from(value: &'a str) -> Result<TopicRegionKey<'a>> {
102 let captures = TOPIC_REGION_PATTERN
103 .captures(value)
104 .context(InvalidMetadataSnafu {
105 err_msg: format!("Invalid TopicRegionKey: {}", value),
106 })?;
107 let topic = captures.get(1).map(|m| m.as_str()).unwrap();
108 let region_id = captures[2].parse::<u64>().map_err(|_| {
109 InvalidMetadataSnafu {
110 err_msg: format!("Invalid region id in TopicRegionKey: {}", value),
111 }
112 .build()
113 })?;
114 Ok(TopicRegionKey {
115 region_id: RegionId::from_u64(region_id),
116 topic,
117 })
118 }
119}
120
121impl ReplayCheckpoint {
122 pub fn new(entry_id: u64, metadata_entry_id: Option<u64>) -> Self {
124 Self {
125 entry_id,
126 metadata_entry_id,
127 }
128 }
129}
130
131impl TopicRegionValue {
132 pub fn new(checkpoint: Option<ReplayCheckpoint>) -> Self {
134 Self { checkpoint }
135 }
136
137 pub fn min_entry_id(&self) -> Option<u64> {
141 match self.checkpoint {
142 Some(ReplayCheckpoint {
143 entry_id,
144 metadata_entry_id,
145 }) => match metadata_entry_id {
146 Some(metadata_entry_id) => Some(entry_id.min(metadata_entry_id)),
147 None => Some(entry_id),
148 },
149 None => None,
150 }
151 }
152}
153
154fn topic_region_decoder(value: &KeyValue) -> Result<(TopicRegionKey<'_>, TopicRegionValue)> {
155 let key = TopicRegionKey::from_bytes(&value.key)?;
156 let value = if value.value.is_empty() {
157 TopicRegionValue::default()
158 } else {
159 TopicRegionValue::try_from_raw_value(&value.value)?
160 };
161 Ok((key, value))
162}
163
164pub struct TopicRegionManager {
166 kv_backend: KvBackendRef,
167}
168
169impl TopicRegionManager {
170 pub fn new(kv_backend: KvBackendRef) -> Self {
171 Self { kv_backend }
172 }
173
174 pub async fn put(&self, key: TopicRegionKey<'_>) -> Result<()> {
175 let put_req = PutRequest {
176 key: key.to_bytes(),
177 value: vec![],
178 prev_kv: false,
179 };
180 self.kv_backend.put(put_req).await?;
181 Ok(())
182 }
183
184 pub async fn batch_get(
185 &self,
186 keys: Vec<TopicRegionKey<'_>>,
187 ) -> Result<HashMap<RegionId, TopicRegionValue>> {
188 let raw_keys = keys.iter().map(|key| key.to_bytes()).collect::<Vec<_>>();
189 let req = BatchGetRequest { keys: raw_keys };
190 let resp = self.kv_backend.batch_get(req).await?;
191
192 let v = resp
193 .kvs
194 .into_iter()
195 .map(|kv| topic_region_decoder(&kv).map(|(key, value)| (key.region_id, value)))
196 .collect::<Result<HashMap<_, _>>>()?;
197
198 Ok(v)
199 }
200
201 pub async fn get(&self, key: TopicRegionKey<'_>) -> Result<Option<TopicRegionValue>> {
202 let key_bytes = key.to_bytes();
203 let resp = self.kv_backend.get(&key_bytes).await?;
204 let value = resp
205 .map(|kv| topic_region_decoder(&kv).map(|(_, value)| value))
206 .transpose()?;
207
208 Ok(value)
209 }
210
211 pub async fn batch_put(
212 &self,
213 keys: &[(TopicRegionKey<'_>, Option<TopicRegionValue>)],
214 ) -> Result<()> {
215 let req = BatchPutRequest {
216 kvs: keys
217 .iter()
218 .map(|(key, value)| {
219 let value = value
220 .map(|v| v.try_as_raw_value())
221 .transpose()?
222 .unwrap_or_default();
223
224 Ok(KeyValue {
225 key: key.to_bytes(),
226 value,
227 })
228 })
229 .collect::<Result<Vec<_>>>()?,
230 prev_kv: false,
231 };
232 self.kv_backend.batch_put(req).await?;
233 Ok(())
234 }
235
236 pub fn build_create_txn(
238 &self,
239 table_id: TableId,
240 region_wal_options: &HashMap<RegionNumber, String>,
241 ) -> Result<Txn> {
242 let region_wal_options = parse_region_wal_options(region_wal_options)?;
243 let topic_region_mapping = self.get_topic_region_mapping(table_id, ®ion_wal_options);
244 let topic_region_keys = topic_region_mapping
245 .iter()
246 .map(|(topic, region_id)| TopicRegionKey::new(*topic, region_id))
247 .collect::<Vec<_>>();
248 let operations = topic_region_keys
249 .into_iter()
250 .map(|key| TxnOp::Put(key.to_bytes(), vec![]))
251 .collect::<Vec<_>>();
252 Ok(Txn::new().and_then(operations))
253 }
254
255 pub async fn regions(&self, topic: &str) -> Result<HashMap<RegionId, TopicRegionValue>> {
257 let prefix = TopicRegionKey::range_topic_key(topic);
258 let req = RangeRequest::new().with_prefix(prefix.as_bytes());
259 let resp = self.kv_backend.range(req).await?;
260 let region_ids = resp
261 .kvs
262 .iter()
263 .map(topic_region_decoder)
264 .collect::<Result<Vec<_>>>()?;
265 Ok(region_ids
266 .into_iter()
267 .map(|(key, value)| (key.region_id, value))
268 .collect())
269 }
270
271 pub async fn delete(&self, key: TopicRegionKey<'_>) -> Result<()> {
272 let raw_key = key.to_bytes();
273 self.kv_backend.delete(&raw_key, false).await?;
274 Ok(())
275 }
276
277 pub async fn batch_delete(&self, keys: Vec<TopicRegionKey<'_>>) -> Result<()> {
278 let raw_keys = keys.iter().map(|key| key.to_bytes()).collect::<Vec<_>>();
279 let req = BatchDeleteRequest {
280 keys: raw_keys,
281 prev_kv: false,
282 };
283 self.kv_backend.batch_delete(req).await?;
284 Ok(())
285 }
286
287 pub fn get_topic_region_mapping<'a>(
293 &self,
294 table_id: TableId,
295 region_wal_options: &'a HashMap<RegionNumber, WalOptions>,
296 ) -> Vec<(RegionId, &'a str)> {
297 region_wal_options
298 .keys()
299 .filter_map(
300 |region_number| match region_wal_options.get(region_number) {
301 Some(WalOptions::Kafka(kafka)) => {
302 let region_id = RegionId::new(table_id, *region_number);
303 Some((region_id, kafka.topic.as_str()))
304 }
305 Some(WalOptions::RaftEngine) => None,
306 Some(WalOptions::Noop) => None,
307 None => None,
308 },
309 )
310 .collect::<Vec<_>>()
311 }
312}
313
314#[cfg(test)]
315mod tests {
316 use std::sync::Arc;
317
318 use common_wal::options::KafkaWalOptions;
319
320 use super::*;
321 use crate::kv_backend::memory::MemoryKvBackend;
322
323 #[tokio::test]
324 async fn test_topic_region_manager() {
325 let kv_backend = Arc::new(MemoryKvBackend::default());
326 let manager = TopicRegionManager::new(kv_backend.clone());
327
328 let topics = (0..16).map(|i| format!("topic_{}", i)).collect::<Vec<_>>();
329 let keys = (0..64)
330 .map(|i| {
331 (
332 TopicRegionKey::new(RegionId::from_u64(i), &topics[(i % 16) as usize]),
333 None,
334 )
335 })
336 .collect::<Vec<_>>();
337
338 manager.batch_put(&keys).await.unwrap();
339 let mut key_values = manager
340 .regions(&topics[0])
341 .await
342 .unwrap()
343 .into_keys()
344 .collect::<Vec<_>>();
345 let expected = keys
346 .iter()
347 .filter_map(|(key, _)| {
348 if key.topic == topics[0] {
349 Some(key.region_id)
350 } else {
351 None
352 }
353 })
354 .collect::<Vec<_>>();
355 key_values.sort_by_key(|id| id.as_u64());
356 assert_eq!(key_values, expected);
357
358 let key = TopicRegionKey::new(RegionId::from_u64(0), "topic_0");
359 manager.delete(key.clone()).await.unwrap();
360 let mut key_values = manager
361 .regions(&topics[0])
362 .await
363 .unwrap()
364 .into_keys()
365 .collect::<Vec<_>>();
366 let expected = keys
367 .iter()
368 .filter_map(|(key, _)| {
369 if key.topic == topics[0] && key.region_id != RegionId::from_u64(0) {
370 Some(key.region_id)
371 } else {
372 None
373 }
374 })
375 .collect::<Vec<_>>();
376 key_values.sort_by_key(|id| id.as_u64());
377 assert_eq!(key_values, expected);
378 }
379
380 #[test]
381 fn test_topic_region_map() {
382 let kv_backend = Arc::new(MemoryKvBackend::default());
383 let manager = TopicRegionManager::new(kv_backend.clone());
384
385 let table_id = 1;
386 let region_wal_options = (0..64)
387 .map(|i| {
388 let region_number = i;
389 let wal_options = if i % 2 == 0 {
390 WalOptions::Kafka(KafkaWalOptions {
391 topic: format!("topic_{}", i),
392 })
393 } else {
394 WalOptions::RaftEngine
395 };
396 (region_number, serde_json::to_string(&wal_options).unwrap())
397 })
398 .collect::<HashMap<_, _>>();
399
400 let region_wal_options = parse_region_wal_options(®ion_wal_options).unwrap();
401 let mut topic_region_mapping =
402 manager.get_topic_region_mapping(table_id, ®ion_wal_options);
403 let mut expected = (0..64)
404 .filter_map(|i| {
405 if i % 2 == 0 {
406 Some((RegionId::new(table_id, i), format!("topic_{}", i)))
407 } else {
408 None
409 }
410 })
411 .collect::<Vec<_>>();
412 topic_region_mapping.sort_by_key(|(region_id, _)| region_id.as_u64());
413 let topic_region_map = topic_region_mapping
414 .iter()
415 .map(|(region_id, topic)| (*region_id, topic.to_string()))
416 .collect::<Vec<_>>();
417 expected.sort_by_key(|(region_id, _)| region_id.as_u64());
418 assert_eq!(topic_region_map, expected);
419 }
420
421 #[test]
422 fn test_topic_region_key_is_match() {
423 let key = "__topic_region/6f153a64-7fac-4cf6-8b0b-a7967dd73879_2/4410931412992";
424 let topic_region_key = TopicRegionKey::try_from(key).unwrap();
425 assert_eq!(
426 topic_region_key.topic,
427 "6f153a64-7fac-4cf6-8b0b-a7967dd73879_2"
428 );
429 assert_eq!(
430 topic_region_key.region_id,
431 RegionId::from_u64(4410931412992)
432 );
433 }
434}