1use std::collections::HashMap;
16use std::fmt::{self, Display};
17
18use common_wal::options::WalOptions;
19use serde::{Deserialize, Serialize};
20use snafu::OptionExt;
21use store_api::storage::{RegionId, RegionNumber};
22use table::metadata::TableId;
23
24use crate::ddl::utils::parse_region_wal_options;
25use crate::error::{Error, InvalidMetadataSnafu, Result};
26use crate::key::{MetadataKey, MetadataValue, TOPIC_REGION_PATTERN, TOPIC_REGION_PREFIX};
27use crate::kv_backend::KvBackendRef;
28use crate::kv_backend::txn::{Txn, TxnOp};
29use crate::rpc::KeyValue;
30use crate::rpc::store::{
31 BatchDeleteRequest, BatchGetRequest, BatchPutRequest, PutRequest, RangeRequest,
32};
33
34#[derive(Debug, Clone, PartialEq, Eq, Hash)]
37pub struct TopicRegionKey<'a> {
38 pub region_id: RegionId,
39 pub topic: &'a str,
40}
41
42#[derive(Debug, Clone, Copy, Serialize, Deserialize, Default)]
44pub struct TopicRegionValue {
45 #[serde(default, skip_serializing_if = "Option::is_none")]
46 pub checkpoint: Option<ReplayCheckpoint>,
47}
48
49#[derive(Debug, Clone, Copy, Serialize, Deserialize, Default, PartialEq, Eq, PartialOrd, Ord)]
50pub struct ReplayCheckpoint {
51 #[serde(default)]
52 pub entry_id: u64,
53 #[serde(default, skip_serializing_if = "Option::is_none")]
54 pub metadata_entry_id: Option<u64>,
55}
56
57impl<'a> TopicRegionKey<'a> {
58 pub fn new(region_id: RegionId, topic: &'a str) -> Self {
59 Self { region_id, topic }
60 }
61
62 pub fn range_topic_key(topic: &str) -> String {
63 format!("{}/{}/", TOPIC_REGION_PREFIX, topic)
64 }
65}
66
67impl<'a> MetadataKey<'a, TopicRegionKey<'a>> for TopicRegionKey<'a> {
68 fn to_bytes(&self) -> Vec<u8> {
69 self.to_string().into_bytes()
70 }
71
72 fn from_bytes(bytes: &'a [u8]) -> Result<TopicRegionKey<'a>> {
73 let key = std::str::from_utf8(bytes).map_err(|e| {
74 InvalidMetadataSnafu {
75 err_msg: format!(
76 "TopicRegionKey '{}' is not a valid UTF8 string: {e}",
77 String::from_utf8_lossy(bytes)
78 ),
79 }
80 .build()
81 })?;
82 TopicRegionKey::try_from(key)
83 }
84}
85
86impl Display for TopicRegionKey<'_> {
87 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
88 write!(
89 f,
90 "{}{}",
91 Self::range_topic_key(self.topic),
92 self.region_id.as_u64()
93 )
94 }
95}
96
97impl<'a> TryFrom<&'a str> for TopicRegionKey<'a> {
98 type Error = Error;
99
100 fn try_from(value: &'a str) -> Result<TopicRegionKey<'a>> {
102 let captures = TOPIC_REGION_PATTERN
103 .captures(value)
104 .context(InvalidMetadataSnafu {
105 err_msg: format!("Invalid TopicRegionKey: {}", value),
106 })?;
107 let topic = captures.get(1).map(|m| m.as_str()).unwrap();
108 let region_id = captures[2].parse::<u64>().map_err(|_| {
109 InvalidMetadataSnafu {
110 err_msg: format!("Invalid region id in TopicRegionKey: {}", value),
111 }
112 .build()
113 })?;
114 Ok(TopicRegionKey {
115 region_id: RegionId::from_u64(region_id),
116 topic,
117 })
118 }
119}
120
121impl ReplayCheckpoint {
122 pub fn new(entry_id: u64, metadata_entry_id: Option<u64>) -> Self {
124 Self {
125 entry_id,
126 metadata_entry_id,
127 }
128 }
129}
130
131impl TopicRegionValue {
132 pub fn new(checkpoint: Option<ReplayCheckpoint>) -> Self {
134 Self { checkpoint }
135 }
136
137 pub fn min_entry_id(&self) -> Option<u64> {
141 match self.checkpoint {
142 Some(ReplayCheckpoint {
143 entry_id,
144 metadata_entry_id,
145 }) => match metadata_entry_id {
146 Some(metadata_entry_id) => Some(entry_id.min(metadata_entry_id)),
147 None => Some(entry_id),
148 },
149 None => None,
150 }
151 }
152}
153
154fn topic_region_decoder(value: &KeyValue) -> Result<(TopicRegionKey<'_>, TopicRegionValue)> {
155 let key = TopicRegionKey::from_bytes(&value.key)?;
156 let value = if value.value.is_empty() {
157 TopicRegionValue::default()
158 } else {
159 TopicRegionValue::try_from_raw_value(&value.value)?
160 };
161 Ok((key, value))
162}
163
164pub struct TopicRegionManager {
166 kv_backend: KvBackendRef,
167}
168
169impl TopicRegionManager {
170 pub fn new(kv_backend: KvBackendRef) -> Self {
171 Self { kv_backend }
172 }
173
174 pub async fn put(&self, key: TopicRegionKey<'_>) -> Result<()> {
175 let put_req = PutRequest {
176 key: key.to_bytes(),
177 value: vec![],
178 prev_kv: false,
179 };
180 self.kv_backend.put(put_req).await?;
181 Ok(())
182 }
183
184 pub async fn batch_get(
185 &self,
186 keys: Vec<TopicRegionKey<'_>>,
187 ) -> Result<HashMap<RegionId, TopicRegionValue>> {
188 let raw_keys = keys.iter().map(|key| key.to_bytes()).collect::<Vec<_>>();
189 let req = BatchGetRequest { keys: raw_keys };
190 let resp = self.kv_backend.batch_get(req).await?;
191
192 let v = resp
193 .kvs
194 .into_iter()
195 .map(|kv| topic_region_decoder(&kv).map(|(key, value)| (key.region_id, value)))
196 .collect::<Result<HashMap<_, _>>>()?;
197
198 Ok(v)
199 }
200
201 pub async fn get(&self, key: TopicRegionKey<'_>) -> Result<Option<TopicRegionValue>> {
202 let key_bytes = key.to_bytes();
203 let resp = self.kv_backend.get(&key_bytes).await?;
204 let value = resp
205 .map(|kv| topic_region_decoder(&kv).map(|(_, value)| value))
206 .transpose()?;
207
208 Ok(value)
209 }
210
211 pub async fn batch_put(
212 &self,
213 keys: &[(TopicRegionKey<'_>, Option<TopicRegionValue>)],
214 ) -> Result<()> {
215 let req = BatchPutRequest {
216 kvs: keys
217 .iter()
218 .map(|(key, value)| {
219 let value = value
220 .map(|v| v.try_as_raw_value())
221 .transpose()?
222 .unwrap_or_default();
223
224 Ok(KeyValue {
225 key: key.to_bytes(),
226 value,
227 })
228 })
229 .collect::<Result<Vec<_>>>()?,
230 prev_kv: false,
231 };
232 self.kv_backend.batch_put(req).await?;
233 Ok(())
234 }
235
236 pub fn build_create_txn(
238 &self,
239 table_id: TableId,
240 region_wal_options: &HashMap<RegionNumber, String>,
241 ) -> Result<Txn> {
242 let region_wal_options = parse_region_wal_options(region_wal_options)?;
243 let topic_region_mapping = self.get_topic_region_mapping(table_id, ®ion_wal_options);
244 let topic_region_keys = topic_region_mapping
245 .iter()
246 .map(|(region_id, topic)| TopicRegionKey::new(*region_id, topic))
247 .collect::<Vec<_>>();
248 let operations = topic_region_keys
249 .into_iter()
250 .map(|key| TxnOp::Put(key.to_bytes(), vec![]))
251 .collect::<Vec<_>>();
252 Ok(Txn::new().and_then(operations))
253 }
254
255 pub fn build_update_txn(
257 &self,
258 table_id: TableId,
259 old_region_wal_options: &HashMap<RegionNumber, String>,
260 new_region_wal_options: &HashMap<RegionNumber, String>,
261 ) -> Result<Txn> {
262 let old_wal_options_parsed = parse_region_wal_options(old_region_wal_options)?;
263 let new_wal_options_parsed = parse_region_wal_options(new_region_wal_options)?;
264 let old_mapping = self.get_topic_region_mapping(table_id, &old_wal_options_parsed);
265 let new_mapping = self.get_topic_region_mapping(table_id, &new_wal_options_parsed);
266
267 let old_map: HashMap<RegionId, &str> = old_mapping.into_iter().collect();
269 let new_map: HashMap<RegionId, &str> = new_mapping.into_iter().collect();
270 let mut ops = Vec::new();
271
272 for (region_id, old_topic) in &old_map {
274 match new_map.get(region_id) {
275 Some(new_topic) if *new_topic == *old_topic => {
276 }
278 _ => {
279 let key = TopicRegionKey::new(*region_id, old_topic);
281 ops.push(TxnOp::Delete(key.to_bytes()));
282 }
283 }
284 }
285
286 for (region_id, new_topic) in &new_map {
288 match old_map.get(region_id) {
289 Some(old_topic) if *old_topic == *new_topic => {
290 }
292 _ => {
293 let key = TopicRegionKey::new(*region_id, new_topic);
295 ops.push(TxnOp::Put(key.to_bytes(), vec![]));
297 }
298 }
299 }
300
301 Ok(Txn::new().and_then(ops))
302 }
303
304 pub async fn regions(&self, topic: &str) -> Result<HashMap<RegionId, TopicRegionValue>> {
306 let prefix = TopicRegionKey::range_topic_key(topic);
307 let req = RangeRequest::new().with_prefix(prefix.as_bytes());
308 let resp = self.kv_backend.range(req).await?;
309 let region_ids = resp
310 .kvs
311 .iter()
312 .map(topic_region_decoder)
313 .collect::<Result<Vec<_>>>()?;
314 Ok(region_ids
315 .into_iter()
316 .map(|(key, value)| (key.region_id, value))
317 .collect())
318 }
319
320 pub async fn delete(&self, key: TopicRegionKey<'_>) -> Result<()> {
321 let raw_key = key.to_bytes();
322 self.kv_backend.delete(&raw_key, false).await?;
323 Ok(())
324 }
325
326 pub async fn batch_delete(&self, keys: Vec<TopicRegionKey<'_>>) -> Result<()> {
327 let raw_keys = keys.iter().map(|key| key.to_bytes()).collect::<Vec<_>>();
328 let req = BatchDeleteRequest {
329 keys: raw_keys,
330 prev_kv: false,
331 };
332 self.kv_backend.batch_delete(req).await?;
333 Ok(())
334 }
335
336 pub fn get_topic_region_mapping<'a>(
342 &self,
343 table_id: TableId,
344 region_wal_options: &'a HashMap<RegionNumber, WalOptions>,
345 ) -> Vec<(RegionId, &'a str)> {
346 region_wal_options
347 .keys()
348 .filter_map(
349 |region_number| match region_wal_options.get(region_number) {
350 Some(WalOptions::Kafka(kafka)) => {
351 let region_id = RegionId::new(table_id, *region_number);
352 Some((region_id, kafka.topic.as_str()))
353 }
354 Some(WalOptions::RaftEngine) => None,
355 Some(WalOptions::Noop) => None,
356 None => None,
357 },
358 )
359 .collect::<Vec<_>>()
360 }
361}
362
363#[cfg(test)]
364mod tests {
365 use std::sync::Arc;
366
367 use common_wal::options::KafkaWalOptions;
368
369 use super::*;
370 use crate::kv_backend::memory::MemoryKvBackend;
371
372 #[tokio::test]
373 async fn test_topic_region_manager() {
374 let kv_backend = Arc::new(MemoryKvBackend::default());
375 let manager = TopicRegionManager::new(kv_backend.clone());
376
377 let topics = (0..16).map(|i| format!("topic_{}", i)).collect::<Vec<_>>();
378 let keys = (0..64)
379 .map(|i| {
380 (
381 TopicRegionKey::new(RegionId::from_u64(i), &topics[(i % 16) as usize]),
382 None,
383 )
384 })
385 .collect::<Vec<_>>();
386
387 manager.batch_put(&keys).await.unwrap();
388 let mut key_values = manager
389 .regions(&topics[0])
390 .await
391 .unwrap()
392 .into_keys()
393 .collect::<Vec<_>>();
394 let expected = keys
395 .iter()
396 .filter_map(|(key, _)| {
397 if key.topic == topics[0] {
398 Some(key.region_id)
399 } else {
400 None
401 }
402 })
403 .collect::<Vec<_>>();
404 key_values.sort_by_key(|id| id.as_u64());
405 assert_eq!(key_values, expected);
406
407 let key = TopicRegionKey::new(RegionId::from_u64(0), "topic_0");
408 manager.delete(key.clone()).await.unwrap();
409 let mut key_values = manager
410 .regions(&topics[0])
411 .await
412 .unwrap()
413 .into_keys()
414 .collect::<Vec<_>>();
415 let expected = keys
416 .iter()
417 .filter_map(|(key, _)| {
418 if key.topic == topics[0] && key.region_id != RegionId::from_u64(0) {
419 Some(key.region_id)
420 } else {
421 None
422 }
423 })
424 .collect::<Vec<_>>();
425 key_values.sort_by_key(|id| id.as_u64());
426 assert_eq!(key_values, expected);
427 }
428
429 #[test]
430 fn test_topic_region_map() {
431 let kv_backend = Arc::new(MemoryKvBackend::default());
432 let manager = TopicRegionManager::new(kv_backend.clone());
433
434 let table_id = 1;
435 let region_wal_options = (0..64)
436 .map(|i| {
437 let region_number = i;
438 let wal_options = if i % 2 == 0 {
439 WalOptions::Kafka(KafkaWalOptions {
440 topic: format!("topic_{}", i),
441 })
442 } else {
443 WalOptions::RaftEngine
444 };
445 (region_number, serde_json::to_string(&wal_options).unwrap())
446 })
447 .collect::<HashMap<_, _>>();
448
449 let region_wal_options = parse_region_wal_options(®ion_wal_options).unwrap();
450 let mut topic_region_mapping =
451 manager.get_topic_region_mapping(table_id, ®ion_wal_options);
452 let mut expected = (0..64)
453 .filter_map(|i| {
454 if i % 2 == 0 {
455 Some((RegionId::new(table_id, i), format!("topic_{}", i)))
456 } else {
457 None
458 }
459 })
460 .collect::<Vec<_>>();
461 topic_region_mapping.sort_by_key(|(region_id, _)| region_id.as_u64());
462 let topic_region_map = topic_region_mapping
463 .iter()
464 .map(|(region_id, topic)| (*region_id, topic.to_string()))
465 .collect::<Vec<_>>();
466 expected.sort_by_key(|(region_id, _)| region_id.as_u64());
467 assert_eq!(topic_region_map, expected);
468 }
469
470 #[test]
471 fn test_topic_region_key_is_match() {
472 let key = "__topic_region/6f153a64-7fac-4cf6-8b0b-a7967dd73879_2/4410931412992";
473 let topic_region_key = TopicRegionKey::try_from(key).unwrap();
474 assert_eq!(
475 topic_region_key.topic,
476 "6f153a64-7fac-4cf6-8b0b-a7967dd73879_2"
477 );
478 assert_eq!(
479 topic_region_key.region_id,
480 RegionId::from_u64(4410931412992)
481 );
482 }
483
484 #[test]
485 fn test_build_create_txn() {
486 let kv_backend = Arc::new(MemoryKvBackend::default());
487 let manager = TopicRegionManager::new(kv_backend.clone());
488 let table_id = 1;
489 let region_wal_options = vec![
490 (
491 0,
492 WalOptions::Kafka(KafkaWalOptions {
493 topic: "topic_0".to_string(),
494 }),
495 ),
496 (
497 1,
498 WalOptions::Kafka(KafkaWalOptions {
499 topic: "topic_1".to_string(),
500 }),
501 ),
502 (2, WalOptions::RaftEngine), ]
504 .into_iter()
505 .map(|(num, opts)| (num, serde_json::to_string(&opts).unwrap()))
506 .collect::<HashMap<_, _>>();
507
508 let txn = manager
509 .build_create_txn(table_id, ®ion_wal_options)
510 .unwrap();
511
512 let ops = txn.req().success.clone();
515 assert_eq!(ops.len(), 2);
516
517 let keys: Vec<_> = ops
518 .iter()
519 .filter_map(|op| {
520 if let TxnOp::Put(key, _) = op {
521 TopicRegionKey::from_bytes(key).ok()
522 } else {
523 None
524 }
525 })
526 .collect();
527
528 assert_eq!(keys.len(), 2);
529 let region_ids: Vec<_> = keys.iter().map(|k| k.region_id).collect();
530 assert!(region_ids.contains(&RegionId::new(table_id, 0)));
531 assert!(region_ids.contains(&RegionId::new(table_id, 1)));
532 assert!(!region_ids.contains(&RegionId::new(table_id, 2)));
533
534 for key in keys {
536 match key.region_id.region_number() {
537 0 => assert_eq!(key.topic, "topic_0"),
538 1 => assert_eq!(key.topic, "topic_1"),
539 _ => panic!("Unexpected region number"),
540 }
541 }
542 }
543
544 #[test]
545 fn test_build_update_txn_add_new_region() {
546 let kv_backend = Arc::new(MemoryKvBackend::default());
547 let manager = TopicRegionManager::new(kv_backend.clone());
548 let table_id = 1;
549 let old_region_wal_options = vec![(
550 0,
551 WalOptions::Kafka(KafkaWalOptions {
552 topic: "topic_0".to_string(),
553 }),
554 )]
555 .into_iter()
556 .map(|(num, opts)| (num, serde_json::to_string(&opts).unwrap()))
557 .collect::<HashMap<_, _>>();
558 let new_region_wal_options = vec![
559 (
560 0,
561 WalOptions::Kafka(KafkaWalOptions {
562 topic: "topic_0".to_string(),
563 }),
564 ),
565 (
566 1,
567 WalOptions::Kafka(KafkaWalOptions {
568 topic: "topic_1".to_string(),
569 }),
570 ),
571 ]
572 .into_iter()
573 .map(|(num, opts)| (num, serde_json::to_string(&opts).unwrap()))
574 .collect::<HashMap<_, _>>();
575 let txn = manager
576 .build_update_txn(table_id, &old_region_wal_options, &new_region_wal_options)
577 .unwrap();
578 let ops = txn.req().success.clone();
579 assert_eq!(ops.len(), 1);
581 if let TxnOp::Put(key, _) = &ops[0] {
582 let topic_key = TopicRegionKey::from_bytes(key).unwrap();
583 assert_eq!(topic_key.region_id, RegionId::new(table_id, 1));
584 assert_eq!(topic_key.topic, "topic_1");
585 } else {
586 panic!("Expected Put operation");
587 }
588 }
589
590 #[test]
591 fn test_build_update_txn_remove_region() {
592 let kv_backend = Arc::new(MemoryKvBackend::default());
593 let manager = TopicRegionManager::new(kv_backend.clone());
594 let table_id = 1;
595 let old_region_wal_options = vec![
596 (
597 0,
598 WalOptions::Kafka(KafkaWalOptions {
599 topic: "topic_0".to_string(),
600 }),
601 ),
602 (
603 1,
604 WalOptions::Kafka(KafkaWalOptions {
605 topic: "topic_1".to_string(),
606 }),
607 ),
608 ]
609 .into_iter()
610 .map(|(num, opts)| (num, serde_json::to_string(&opts).unwrap()))
611 .collect::<HashMap<_, _>>();
612 let new_region_wal_options = vec![(
613 0,
614 WalOptions::Kafka(KafkaWalOptions {
615 topic: "topic_0".to_string(),
616 }),
617 )]
618 .into_iter()
619 .map(|(num, opts)| (num, serde_json::to_string(&opts).unwrap()))
620 .collect::<HashMap<_, _>>();
621 let txn = manager
622 .build_update_txn(table_id, &old_region_wal_options, &new_region_wal_options)
623 .unwrap();
624 let ops = txn.req().success.clone();
625 assert_eq!(ops.len(), 1);
627 match &ops[0] {
628 TxnOp::Delete(key) => {
629 let topic_key = TopicRegionKey::from_bytes(key).unwrap();
630 assert_eq!(topic_key.region_id, RegionId::new(table_id, 1));
631 assert_eq!(topic_key.topic, "topic_1");
632 }
633 TxnOp::Put(_, _) | TxnOp::Get(_) => {
634 panic!("Expected Delete operation");
635 }
636 }
637 }
638
639 #[test]
640 fn test_build_update_txn_change_topic() {
641 let kv_backend = Arc::new(MemoryKvBackend::default());
642 let manager = TopicRegionManager::new(kv_backend.clone());
643 let table_id = 1;
644 let old_region_wal_options = vec![(
645 0,
646 WalOptions::Kafka(KafkaWalOptions {
647 topic: "topic_0".to_string(),
648 }),
649 )]
650 .into_iter()
651 .map(|(num, opts)| (num, serde_json::to_string(&opts).unwrap()))
652 .collect::<HashMap<_, _>>();
653 let new_region_wal_options = vec![(
654 0,
655 WalOptions::Kafka(KafkaWalOptions {
656 topic: "topic_0_new".to_string(),
657 }),
658 )]
659 .into_iter()
660 .map(|(num, opts)| (num, serde_json::to_string(&opts).unwrap()))
661 .collect::<HashMap<_, _>>();
662 let txn = manager
663 .build_update_txn(table_id, &old_region_wal_options, &new_region_wal_options)
664 .unwrap();
665 let ops = txn.req().success.clone();
666 assert_eq!(ops.len(), 2);
668
669 let mut delete_found = false;
670 let mut put_found = false;
671 for op in ops {
672 match op {
673 TxnOp::Delete(key) => {
674 let topic_key = TopicRegionKey::from_bytes(&key).unwrap();
675 assert_eq!(topic_key.region_id, RegionId::new(table_id, 0));
676 assert_eq!(topic_key.topic, "topic_0");
677 delete_found = true;
678 }
679 TxnOp::Put(key, _) => {
680 let topic_key = TopicRegionKey::from_bytes(&key).unwrap();
681 assert_eq!(topic_key.region_id, RegionId::new(table_id, 0));
682 assert_eq!(topic_key.topic, "topic_0_new");
683 put_found = true;
684 }
685 TxnOp::Get(_) => {
686 panic!("Unexpected Get operation in update transaction");
688 }
689 }
690 }
691 assert!(delete_found, "Expected Delete operation for old topic");
692 assert!(put_found, "Expected Put operation for new topic");
693 }
694
695 #[test]
696 fn test_build_update_txn_no_change() {
697 let kv_backend = Arc::new(MemoryKvBackend::default());
698 let manager = TopicRegionManager::new(kv_backend.clone());
699 let table_id = 1;
700 let region_wal_options = vec![
701 (
702 0,
703 WalOptions::Kafka(KafkaWalOptions {
704 topic: "topic_0".to_string(),
705 }),
706 ),
707 (
708 1,
709 WalOptions::Kafka(KafkaWalOptions {
710 topic: "topic_1".to_string(),
711 }),
712 ),
713 ]
714 .into_iter()
715 .map(|(num, opts)| (num, serde_json::to_string(&opts).unwrap()))
716 .collect::<HashMap<_, _>>();
717 let txn = manager
718 .build_update_txn(table_id, ®ion_wal_options, ®ion_wal_options)
719 .unwrap();
720 let ops = txn.req().success.clone();
722 assert_eq!(ops.len(), 0);
723 }
724
725 #[test]
726 fn test_build_update_txn_mixed_scenarios() {
727 let kv_backend = Arc::new(MemoryKvBackend::default());
728 let manager = TopicRegionManager::new(kv_backend.clone());
729 let table_id = 1;
730 let old_region_wal_options = vec![
731 (
732 0,
733 WalOptions::Kafka(KafkaWalOptions {
734 topic: "topic_0".to_string(),
735 }),
736 ),
737 (
738 1,
739 WalOptions::Kafka(KafkaWalOptions {
740 topic: "topic_1".to_string(),
741 }),
742 ),
743 (
744 2,
745 WalOptions::Kafka(KafkaWalOptions {
746 topic: "topic_2".to_string(),
747 }),
748 ),
749 ]
750 .into_iter()
751 .map(|(num, opts)| (num, serde_json::to_string(&opts).unwrap()))
752 .collect::<HashMap<_, _>>();
753 let new_region_wal_options = vec![
754 (
755 0,
756 WalOptions::Kafka(KafkaWalOptions {
757 topic: "topic_0".to_string(), }),
759 ),
760 (
761 1,
762 WalOptions::Kafka(KafkaWalOptions {
763 topic: "topic_1_new".to_string(), }),
765 ),
766 (
768 3,
769 WalOptions::Kafka(KafkaWalOptions {
770 topic: "topic_3".to_string(), }),
772 ),
773 ]
774 .into_iter()
775 .map(|(num, opts)| (num, serde_json::to_string(&opts).unwrap()))
776 .collect::<HashMap<_, _>>();
777 let txn = manager
778 .build_update_txn(table_id, &old_region_wal_options, &new_region_wal_options)
779 .unwrap();
780
781 let ops = txn.req().success.clone();
782 assert_eq!(ops.len(), 4);
789
790 let mut delete_ops = 0;
791 let mut put_ops = 0;
792 let mut delete_region_2 = false;
793 let mut delete_region_1_old = false;
794 let mut put_region_1_new = false;
795 let mut put_region_3 = false;
796
797 for op in ops {
798 match op {
799 TxnOp::Delete(key) => {
800 delete_ops += 1;
801 let topic_key = TopicRegionKey::from_bytes(&key).unwrap();
802 match topic_key.region_id.region_number() {
803 1 => {
804 assert_eq!(topic_key.topic, "topic_1");
805 delete_region_1_old = true;
806 }
807 2 => {
808 assert_eq!(topic_key.topic, "topic_2");
809 delete_region_2 = true;
810 }
811 _ => panic!("Unexpected delete operation for region"),
812 }
813 }
814 TxnOp::Put(key, _) => {
815 put_ops += 1;
816 let topic_key: TopicRegionKey<'_> = TopicRegionKey::from_bytes(&key).unwrap();
817 match topic_key.region_id.region_number() {
818 1 => {
819 assert_eq!(topic_key.topic, "topic_1_new");
820 put_region_1_new = true;
821 }
822 3 => {
823 assert_eq!(topic_key.topic, "topic_3");
824 put_region_3 = true;
825 }
826 _ => panic!("Unexpected put operation for region"),
827 }
828 }
829 TxnOp::Get(_) => {
830 panic!("Unexpected Get operation in update transaction");
831 }
832 }
833 }
834
835 assert_eq!(delete_ops, 2);
836 assert_eq!(put_ops, 2);
837 assert!(delete_region_2, "Expected delete for removed region 2");
838 assert!(
839 delete_region_1_old,
840 "Expected delete for region 1 old topic"
841 );
842 assert!(put_region_1_new, "Expected put for region 1 new topic");
843 assert!(put_region_3, "Expected put for new region 3");
844 }
845
846 #[test]
847 fn test_build_update_txn_with_raft_engine() {
848 let kv_backend = Arc::new(MemoryKvBackend::default());
849 let manager = TopicRegionManager::new(kv_backend.clone());
850 let table_id = 1;
851 let old_region_wal_options = vec![
852 (
853 0,
854 WalOptions::Kafka(KafkaWalOptions {
855 topic: "topic_0".to_string(),
856 }),
857 ),
858 (1, WalOptions::RaftEngine), ]
860 .into_iter()
861 .map(|(num, opts)| (num, serde_json::to_string(&opts).unwrap()))
862 .collect::<HashMap<_, _>>();
863 let new_region_wal_options = vec![
864 (
865 0,
866 WalOptions::Kafka(KafkaWalOptions {
867 topic: "topic_0".to_string(),
868 }),
869 ),
870 (
871 1,
872 WalOptions::Kafka(KafkaWalOptions {
873 topic: "topic_1".to_string(), }),
875 ),
876 ]
877 .into_iter()
878 .map(|(num, opts)| (num, serde_json::to_string(&opts).unwrap()))
879 .collect::<HashMap<_, _>>();
880 let txn = manager
881 .build_update_txn(table_id, &old_region_wal_options, &new_region_wal_options)
882 .unwrap();
883 let ops = txn.req().success.clone();
884 assert_eq!(ops.len(), 1);
888 match &ops[0] {
889 TxnOp::Put(key, _) => {
890 let topic_key = TopicRegionKey::from_bytes(key).unwrap();
891 assert_eq!(topic_key.region_id, RegionId::new(table_id, 1));
892 assert_eq!(topic_key.topic, "topic_1");
893 }
894 TxnOp::Delete(_) | TxnOp::Get(_) => {
895 panic!("Expected Put operation for new Kafka region");
896 }
897 }
898 }
899}