1use std::collections::HashMap;
16use std::fmt::{self, Display};
17
18use common_wal::options::WalOptions;
19use serde::{Deserialize, Serialize};
20use snafu::OptionExt;
21use store_api::storage::{RegionId, RegionNumber};
22use table::metadata::TableId;
23
24use crate::ddl::utils::parse_region_wal_options;
25use crate::error::{Error, InvalidMetadataSnafu, Result};
26use crate::key::{MetadataKey, MetadataValue, TOPIC_REGION_PATTERN, TOPIC_REGION_PREFIX};
27use crate::kv_backend::KvBackendRef;
28use crate::kv_backend::txn::{Txn, TxnOp};
29use crate::rpc::KeyValue;
30use crate::rpc::store::{
31 BatchDeleteRequest, BatchGetRequest, BatchPutRequest, PutRequest, RangeRequest,
32};
33
34#[derive(Debug, Clone, PartialEq, Eq, Hash)]
37pub struct TopicRegionKey<'a> {
38 pub region_id: RegionId,
39 pub topic: &'a str,
40}
41
42#[derive(Debug, Clone, Copy, Serialize, Deserialize, Default)]
44pub struct TopicRegionValue {
45 #[serde(default, skip_serializing_if = "Option::is_none")]
46 pub checkpoint: Option<ReplayCheckpoint>,
47}
48
49#[derive(Debug, Clone, Copy, Serialize, Deserialize, Default, PartialEq, Eq, PartialOrd, Ord)]
50pub struct ReplayCheckpoint {
51 #[serde(default)]
52 pub entry_id: u64,
53 #[serde(default, skip_serializing_if = "Option::is_none")]
54 pub metadata_entry_id: Option<u64>,
55}
56
57impl<'a> TopicRegionKey<'a> {
58 pub fn new(region_id: RegionId, topic: &'a str) -> Self {
59 Self { region_id, topic }
60 }
61
62 pub fn range_topic_key(topic: &str) -> String {
63 format!("{}/{}/", TOPIC_REGION_PREFIX, topic)
64 }
65}
66
67impl<'a> MetadataKey<'a, TopicRegionKey<'a>> for TopicRegionKey<'a> {
68 fn to_bytes(&self) -> Vec<u8> {
69 self.to_string().into_bytes()
70 }
71
72 fn from_bytes(bytes: &'a [u8]) -> Result<TopicRegionKey<'a>> {
73 let key = std::str::from_utf8(bytes).map_err(|e| {
74 InvalidMetadataSnafu {
75 err_msg: format!(
76 "TopicRegionKey '{}' is not a valid UTF8 string: {e}",
77 String::from_utf8_lossy(bytes)
78 ),
79 }
80 .build()
81 })?;
82 TopicRegionKey::try_from(key)
83 }
84}
85
86impl Display for TopicRegionKey<'_> {
87 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
88 write!(
89 f,
90 "{}{}",
91 Self::range_topic_key(self.topic),
92 self.region_id.as_u64()
93 )
94 }
95}
96
97impl<'a> TryFrom<&'a str> for TopicRegionKey<'a> {
98 type Error = Error;
99
100 fn try_from(value: &'a str) -> Result<TopicRegionKey<'a>> {
102 let captures = TOPIC_REGION_PATTERN
103 .captures(value)
104 .context(InvalidMetadataSnafu {
105 err_msg: format!("Invalid TopicRegionKey: {}", value),
106 })?;
107 let topic = captures.get(1).map(|m| m.as_str()).unwrap();
108 let region_id = captures[2].parse::<u64>().map_err(|_| {
109 InvalidMetadataSnafu {
110 err_msg: format!("Invalid region id in TopicRegionKey: {}", value),
111 }
112 .build()
113 })?;
114 Ok(TopicRegionKey {
115 region_id: RegionId::from_u64(region_id),
116 topic,
117 })
118 }
119}
120
121impl ReplayCheckpoint {
122 pub fn new(entry_id: u64, metadata_entry_id: Option<u64>) -> Self {
124 Self {
125 entry_id,
126 metadata_entry_id,
127 }
128 }
129
130 pub fn merge_with_topic_pruned_entry_id(
132 checkpoint: Option<Self>,
133 pruned_entry_id: Option<u64>,
134 is_metric_engine: bool,
135 ) -> Option<Self> {
136 match (checkpoint, pruned_entry_id) {
137 (Some(checkpoint), Some(pruned_entry_id)) => Some(Self {
138 entry_id: checkpoint.entry_id.max(pruned_entry_id),
139 metadata_entry_id: if is_metric_engine {
140 Some(
141 checkpoint
142 .metadata_entry_id
143 .unwrap_or_default()
144 .max(pruned_entry_id),
145 )
146 } else {
147 checkpoint.metadata_entry_id
148 },
149 }),
150 (None, Some(pruned_entry_id)) => Some(Self {
151 entry_id: pruned_entry_id,
152 metadata_entry_id: is_metric_engine.then_some(pruned_entry_id),
153 }),
154 (checkpoint, None) => checkpoint,
155 }
156 }
157}
158
159impl TopicRegionValue {
160 pub fn new(checkpoint: Option<ReplayCheckpoint>) -> Self {
162 Self { checkpoint }
163 }
164
165 pub fn min_entry_id(&self) -> Option<u64> {
169 match self.checkpoint {
170 Some(ReplayCheckpoint {
171 entry_id,
172 metadata_entry_id,
173 }) => match metadata_entry_id {
174 Some(metadata_entry_id) => Some(entry_id.min(metadata_entry_id)),
175 None => Some(entry_id),
176 },
177 None => None,
178 }
179 }
180}
181
182fn topic_region_decoder(value: &KeyValue) -> Result<(TopicRegionKey<'_>, TopicRegionValue)> {
183 let key = TopicRegionKey::from_bytes(&value.key)?;
184 let value = if value.value.is_empty() {
185 TopicRegionValue::default()
186 } else {
187 TopicRegionValue::try_from_raw_value(&value.value)?
188 };
189 Ok((key, value))
190}
191
192pub struct TopicRegionManager {
194 kv_backend: KvBackendRef,
195}
196
197impl TopicRegionManager {
198 pub fn new(kv_backend: KvBackendRef) -> Self {
199 Self { kv_backend }
200 }
201
202 pub async fn put(&self, key: TopicRegionKey<'_>) -> Result<()> {
203 let put_req = PutRequest {
204 key: key.to_bytes(),
205 value: vec![],
206 prev_kv: false,
207 };
208 self.kv_backend.put(put_req).await?;
209 Ok(())
210 }
211
212 pub async fn batch_get(
213 &self,
214 keys: Vec<TopicRegionKey<'_>>,
215 ) -> Result<HashMap<RegionId, TopicRegionValue>> {
216 let raw_keys = keys.iter().map(|key| key.to_bytes()).collect::<Vec<_>>();
217 let req = BatchGetRequest { keys: raw_keys };
218 let resp = self.kv_backend.batch_get(req).await?;
219
220 let v = resp
221 .kvs
222 .into_iter()
223 .map(|kv| topic_region_decoder(&kv).map(|(key, value)| (key.region_id, value)))
224 .collect::<Result<HashMap<_, _>>>()?;
225
226 Ok(v)
227 }
228
229 pub async fn get(&self, key: TopicRegionKey<'_>) -> Result<Option<TopicRegionValue>> {
230 let key_bytes = key.to_bytes();
231 let resp = self.kv_backend.get(&key_bytes).await?;
232 let value = resp
233 .map(|kv| topic_region_decoder(&kv).map(|(_, value)| value))
234 .transpose()?;
235
236 Ok(value)
237 }
238
239 pub async fn batch_put(
240 &self,
241 keys: &[(TopicRegionKey<'_>, Option<TopicRegionValue>)],
242 ) -> Result<()> {
243 let req = BatchPutRequest {
244 kvs: keys
245 .iter()
246 .map(|(key, value)| {
247 let value = value
248 .map(|v| v.try_as_raw_value())
249 .transpose()?
250 .unwrap_or_default();
251
252 Ok(KeyValue {
253 key: key.to_bytes(),
254 value,
255 })
256 })
257 .collect::<Result<Vec<_>>>()?,
258 prev_kv: false,
259 };
260 self.kv_backend.batch_put(req).await?;
261 Ok(())
262 }
263
264 pub fn build_create_txn(
266 &self,
267 table_id: TableId,
268 region_wal_options: &HashMap<RegionNumber, String>,
269 ) -> Result<Txn> {
270 let region_wal_options = parse_region_wal_options(region_wal_options)?;
271 let topic_region_mapping = self.get_topic_region_mapping(table_id, ®ion_wal_options);
272 let topic_region_keys = topic_region_mapping
273 .iter()
274 .map(|(region_id, topic)| TopicRegionKey::new(*region_id, topic))
275 .collect::<Vec<_>>();
276 let operations = topic_region_keys
277 .into_iter()
278 .map(|key| TxnOp::Put(key.to_bytes(), vec![]))
279 .collect::<Vec<_>>();
280 Ok(Txn::new().and_then(operations))
281 }
282
283 pub fn build_update_txn(
285 &self,
286 table_id: TableId,
287 old_region_wal_options: &HashMap<RegionNumber, String>,
288 new_region_wal_options: &HashMap<RegionNumber, String>,
289 ) -> Result<Txn> {
290 let old_wal_options_parsed = parse_region_wal_options(old_region_wal_options)?;
291 let new_wal_options_parsed = parse_region_wal_options(new_region_wal_options)?;
292 let old_mapping = self.get_topic_region_mapping(table_id, &old_wal_options_parsed);
293 let new_mapping = self.get_topic_region_mapping(table_id, &new_wal_options_parsed);
294
295 let old_map: HashMap<RegionId, &str> = old_mapping.into_iter().collect();
297 let new_map: HashMap<RegionId, &str> = new_mapping.into_iter().collect();
298 let mut ops = Vec::new();
299
300 for (region_id, old_topic) in &old_map {
302 match new_map.get(region_id) {
303 Some(new_topic) if *new_topic == *old_topic => {
304 }
306 _ => {
307 let key = TopicRegionKey::new(*region_id, old_topic);
309 ops.push(TxnOp::Delete(key.to_bytes()));
310 }
311 }
312 }
313
314 for (region_id, new_topic) in &new_map {
316 match old_map.get(region_id) {
317 Some(old_topic) if *old_topic == *new_topic => {
318 }
320 _ => {
321 let key = TopicRegionKey::new(*region_id, new_topic);
323 ops.push(TxnOp::Put(key.to_bytes(), vec![]));
325 }
326 }
327 }
328
329 Ok(Txn::new().and_then(ops))
330 }
331
332 pub async fn regions(&self, topic: &str) -> Result<HashMap<RegionId, TopicRegionValue>> {
334 let prefix = TopicRegionKey::range_topic_key(topic);
335 let req = RangeRequest::new().with_prefix(prefix.as_bytes());
336 let resp = self.kv_backend.range(req).await?;
337 let region_ids = resp
338 .kvs
339 .iter()
340 .map(topic_region_decoder)
341 .collect::<Result<Vec<_>>>()?;
342 Ok(region_ids
343 .into_iter()
344 .map(|(key, value)| (key.region_id, value))
345 .collect())
346 }
347
348 pub async fn delete(&self, key: TopicRegionKey<'_>) -> Result<()> {
349 let raw_key = key.to_bytes();
350 self.kv_backend.delete(&raw_key, false).await?;
351 Ok(())
352 }
353
354 pub async fn batch_delete(&self, keys: Vec<TopicRegionKey<'_>>) -> Result<()> {
355 let raw_keys = keys.iter().map(|key| key.to_bytes()).collect::<Vec<_>>();
356 let req = BatchDeleteRequest {
357 keys: raw_keys,
358 prev_kv: false,
359 };
360 self.kv_backend.batch_delete(req).await?;
361 Ok(())
362 }
363
364 pub fn get_topic_region_mapping<'a>(
370 &self,
371 table_id: TableId,
372 region_wal_options: &'a HashMap<RegionNumber, WalOptions>,
373 ) -> Vec<(RegionId, &'a str)> {
374 region_wal_options
375 .keys()
376 .filter_map(
377 |region_number| match region_wal_options.get(region_number) {
378 Some(WalOptions::Kafka(kafka)) => {
379 let region_id = RegionId::new(table_id, *region_number);
380 Some((region_id, kafka.topic.as_str()))
381 }
382 Some(WalOptions::RaftEngine) => None,
383 Some(WalOptions::Noop) => None,
384 None => None,
385 },
386 )
387 .collect::<Vec<_>>()
388 }
389}
390
391#[cfg(test)]
392mod tests {
393 use std::sync::Arc;
394
395 use common_wal::options::KafkaWalOptions;
396
397 use super::*;
398 use crate::kv_backend::memory::MemoryKvBackend;
399
400 #[test]
401 fn test_merge_checkpoint_with_topic_pruned_entry_id_missing_pruned() {
402 let checkpoint = Some(ReplayCheckpoint::new(10, None));
403
404 assert_eq!(
405 ReplayCheckpoint::merge_with_topic_pruned_entry_id(checkpoint, None, true),
406 checkpoint
407 );
408 }
409
410 #[test]
411 fn test_merge_checkpoint_with_topic_pruned_entry_id_creates_checkpoint() {
412 assert_eq!(
413 ReplayCheckpoint::merge_with_topic_pruned_entry_id(None, Some(10), true),
414 Some(ReplayCheckpoint::new(10, Some(10)))
415 );
416 }
417
418 #[test]
419 fn test_merge_checkpoint_with_topic_pruned_entry_id_updates_both_ids() {
420 let checkpoint = ReplayCheckpoint::new(10, Some(5));
421
422 assert_eq!(
423 ReplayCheckpoint::merge_with_topic_pruned_entry_id(Some(checkpoint), Some(20), true),
424 Some(ReplayCheckpoint::new(20, Some(20)))
425 );
426 }
427
428 #[test]
429 fn test_merge_checkpoint_with_topic_pruned_entry_id_preserves_larger_ids() {
430 let checkpoint = ReplayCheckpoint::new(30, Some(40));
431
432 assert_eq!(
433 ReplayCheckpoint::merge_with_topic_pruned_entry_id(Some(checkpoint), Some(20), true),
434 Some(checkpoint)
435 );
436 }
437
438 #[test]
439 fn test_merge_checkpoint_with_topic_pruned_entry_id_for_mito() {
440 assert_eq!(
441 ReplayCheckpoint::merge_with_topic_pruned_entry_id(None, Some(10), false),
442 Some(ReplayCheckpoint::new(10, None))
443 );
444
445 let checkpoint = ReplayCheckpoint::new(5, Some(8));
446 assert_eq!(
447 ReplayCheckpoint::merge_with_topic_pruned_entry_id(Some(checkpoint), Some(10), false),
448 Some(ReplayCheckpoint::new(10, Some(8)))
449 );
450 }
451
452 #[tokio::test]
453 async fn test_topic_region_manager() {
454 let kv_backend = Arc::new(MemoryKvBackend::default());
455 let manager = TopicRegionManager::new(kv_backend.clone());
456
457 let topics = (0..16).map(|i| format!("topic_{}", i)).collect::<Vec<_>>();
458 let keys = (0..64)
459 .map(|i| {
460 (
461 TopicRegionKey::new(RegionId::from_u64(i), &topics[(i % 16) as usize]),
462 None,
463 )
464 })
465 .collect::<Vec<_>>();
466
467 manager.batch_put(&keys).await.unwrap();
468 let mut key_values = manager
469 .regions(&topics[0])
470 .await
471 .unwrap()
472 .into_keys()
473 .collect::<Vec<_>>();
474 let expected = keys
475 .iter()
476 .filter_map(|(key, _)| {
477 if key.topic == topics[0] {
478 Some(key.region_id)
479 } else {
480 None
481 }
482 })
483 .collect::<Vec<_>>();
484 key_values.sort_by_key(|id| id.as_u64());
485 assert_eq!(key_values, expected);
486
487 let key = TopicRegionKey::new(RegionId::from_u64(0), "topic_0");
488 manager.delete(key.clone()).await.unwrap();
489 let mut key_values = manager
490 .regions(&topics[0])
491 .await
492 .unwrap()
493 .into_keys()
494 .collect::<Vec<_>>();
495 let expected = keys
496 .iter()
497 .filter_map(|(key, _)| {
498 if key.topic == topics[0] && key.region_id != RegionId::from_u64(0) {
499 Some(key.region_id)
500 } else {
501 None
502 }
503 })
504 .collect::<Vec<_>>();
505 key_values.sort_by_key(|id| id.as_u64());
506 assert_eq!(key_values, expected);
507 }
508
509 #[test]
510 fn test_topic_region_map() {
511 let kv_backend = Arc::new(MemoryKvBackend::default());
512 let manager = TopicRegionManager::new(kv_backend.clone());
513
514 let table_id = 1;
515 let region_wal_options = (0..64)
516 .map(|i| {
517 let region_number = i;
518 let wal_options = if i % 2 == 0 {
519 WalOptions::Kafka(KafkaWalOptions {
520 topic: format!("topic_{}", i),
521 })
522 } else {
523 WalOptions::RaftEngine
524 };
525 (region_number, serde_json::to_string(&wal_options).unwrap())
526 })
527 .collect::<HashMap<_, _>>();
528
529 let region_wal_options = parse_region_wal_options(®ion_wal_options).unwrap();
530 let mut topic_region_mapping =
531 manager.get_topic_region_mapping(table_id, ®ion_wal_options);
532 let mut expected = (0..64)
533 .filter_map(|i| {
534 if i % 2 == 0 {
535 Some((RegionId::new(table_id, i), format!("topic_{}", i)))
536 } else {
537 None
538 }
539 })
540 .collect::<Vec<_>>();
541 topic_region_mapping.sort_by_key(|(region_id, _)| region_id.as_u64());
542 let topic_region_map = topic_region_mapping
543 .iter()
544 .map(|(region_id, topic)| (*region_id, topic.to_string()))
545 .collect::<Vec<_>>();
546 expected.sort_by_key(|(region_id, _)| region_id.as_u64());
547 assert_eq!(topic_region_map, expected);
548 }
549
550 #[test]
551 fn test_topic_region_key_is_match() {
552 let key = "__topic_region/6f153a64-7fac-4cf6-8b0b-a7967dd73879_2/4410931412992";
553 let topic_region_key = TopicRegionKey::try_from(key).unwrap();
554 assert_eq!(
555 topic_region_key.topic,
556 "6f153a64-7fac-4cf6-8b0b-a7967dd73879_2"
557 );
558 assert_eq!(
559 topic_region_key.region_id,
560 RegionId::from_u64(4410931412992)
561 );
562 }
563
564 #[test]
565 fn test_build_create_txn() {
566 let kv_backend = Arc::new(MemoryKvBackend::default());
567 let manager = TopicRegionManager::new(kv_backend.clone());
568 let table_id = 1;
569 let region_wal_options = vec![
570 (
571 0,
572 WalOptions::Kafka(KafkaWalOptions {
573 topic: "topic_0".to_string(),
574 }),
575 ),
576 (
577 1,
578 WalOptions::Kafka(KafkaWalOptions {
579 topic: "topic_1".to_string(),
580 }),
581 ),
582 (2, WalOptions::RaftEngine), ]
584 .into_iter()
585 .map(|(num, opts)| (num, serde_json::to_string(&opts).unwrap()))
586 .collect::<HashMap<_, _>>();
587
588 let txn = manager
589 .build_create_txn(table_id, ®ion_wal_options)
590 .unwrap();
591
592 let ops = txn.req().success.clone();
595 assert_eq!(ops.len(), 2);
596
597 let keys: Vec<_> = ops
598 .iter()
599 .filter_map(|op| {
600 if let TxnOp::Put(key, _) = op {
601 TopicRegionKey::from_bytes(key).ok()
602 } else {
603 None
604 }
605 })
606 .collect();
607
608 assert_eq!(keys.len(), 2);
609 let region_ids: Vec<_> = keys.iter().map(|k| k.region_id).collect();
610 assert!(region_ids.contains(&RegionId::new(table_id, 0)));
611 assert!(region_ids.contains(&RegionId::new(table_id, 1)));
612 assert!(!region_ids.contains(&RegionId::new(table_id, 2)));
613
614 for key in keys {
616 match key.region_id.region_number() {
617 0 => assert_eq!(key.topic, "topic_0"),
618 1 => assert_eq!(key.topic, "topic_1"),
619 _ => panic!("Unexpected region number"),
620 }
621 }
622 }
623
624 #[test]
625 fn test_build_update_txn_add_new_region() {
626 let kv_backend = Arc::new(MemoryKvBackend::default());
627 let manager = TopicRegionManager::new(kv_backend.clone());
628 let table_id = 1;
629 let old_region_wal_options = vec![(
630 0,
631 WalOptions::Kafka(KafkaWalOptions {
632 topic: "topic_0".to_string(),
633 }),
634 )]
635 .into_iter()
636 .map(|(num, opts)| (num, serde_json::to_string(&opts).unwrap()))
637 .collect::<HashMap<_, _>>();
638 let new_region_wal_options = vec![
639 (
640 0,
641 WalOptions::Kafka(KafkaWalOptions {
642 topic: "topic_0".to_string(),
643 }),
644 ),
645 (
646 1,
647 WalOptions::Kafka(KafkaWalOptions {
648 topic: "topic_1".to_string(),
649 }),
650 ),
651 ]
652 .into_iter()
653 .map(|(num, opts)| (num, serde_json::to_string(&opts).unwrap()))
654 .collect::<HashMap<_, _>>();
655 let txn = manager
656 .build_update_txn(table_id, &old_region_wal_options, &new_region_wal_options)
657 .unwrap();
658 let ops = txn.req().success.clone();
659 assert_eq!(ops.len(), 1);
661 if let TxnOp::Put(key, _) = &ops[0] {
662 let topic_key = TopicRegionKey::from_bytes(key).unwrap();
663 assert_eq!(topic_key.region_id, RegionId::new(table_id, 1));
664 assert_eq!(topic_key.topic, "topic_1");
665 } else {
666 panic!("Expected Put operation");
667 }
668 }
669
670 #[test]
671 fn test_build_update_txn_remove_region() {
672 let kv_backend = Arc::new(MemoryKvBackend::default());
673 let manager = TopicRegionManager::new(kv_backend.clone());
674 let table_id = 1;
675 let old_region_wal_options = vec![
676 (
677 0,
678 WalOptions::Kafka(KafkaWalOptions {
679 topic: "topic_0".to_string(),
680 }),
681 ),
682 (
683 1,
684 WalOptions::Kafka(KafkaWalOptions {
685 topic: "topic_1".to_string(),
686 }),
687 ),
688 ]
689 .into_iter()
690 .map(|(num, opts)| (num, serde_json::to_string(&opts).unwrap()))
691 .collect::<HashMap<_, _>>();
692 let new_region_wal_options = vec![(
693 0,
694 WalOptions::Kafka(KafkaWalOptions {
695 topic: "topic_0".to_string(),
696 }),
697 )]
698 .into_iter()
699 .map(|(num, opts)| (num, serde_json::to_string(&opts).unwrap()))
700 .collect::<HashMap<_, _>>();
701 let txn = manager
702 .build_update_txn(table_id, &old_region_wal_options, &new_region_wal_options)
703 .unwrap();
704 let ops = txn.req().success.clone();
705 assert_eq!(ops.len(), 1);
707 match &ops[0] {
708 TxnOp::Delete(key) => {
709 let topic_key = TopicRegionKey::from_bytes(key).unwrap();
710 assert_eq!(topic_key.region_id, RegionId::new(table_id, 1));
711 assert_eq!(topic_key.topic, "topic_1");
712 }
713 TxnOp::Put(_, _) | TxnOp::Get(_) => {
714 panic!("Expected Delete operation");
715 }
716 }
717 }
718
719 #[test]
720 fn test_build_update_txn_change_topic() {
721 let kv_backend = Arc::new(MemoryKvBackend::default());
722 let manager = TopicRegionManager::new(kv_backend.clone());
723 let table_id = 1;
724 let old_region_wal_options = vec![(
725 0,
726 WalOptions::Kafka(KafkaWalOptions {
727 topic: "topic_0".to_string(),
728 }),
729 )]
730 .into_iter()
731 .map(|(num, opts)| (num, serde_json::to_string(&opts).unwrap()))
732 .collect::<HashMap<_, _>>();
733 let new_region_wal_options = vec![(
734 0,
735 WalOptions::Kafka(KafkaWalOptions {
736 topic: "topic_0_new".to_string(),
737 }),
738 )]
739 .into_iter()
740 .map(|(num, opts)| (num, serde_json::to_string(&opts).unwrap()))
741 .collect::<HashMap<_, _>>();
742 let txn = manager
743 .build_update_txn(table_id, &old_region_wal_options, &new_region_wal_options)
744 .unwrap();
745 let ops = txn.req().success.clone();
746 assert_eq!(ops.len(), 2);
748
749 let mut delete_found = false;
750 let mut put_found = false;
751 for op in ops {
752 match op {
753 TxnOp::Delete(key) => {
754 let topic_key = TopicRegionKey::from_bytes(&key).unwrap();
755 assert_eq!(topic_key.region_id, RegionId::new(table_id, 0));
756 assert_eq!(topic_key.topic, "topic_0");
757 delete_found = true;
758 }
759 TxnOp::Put(key, _) => {
760 let topic_key = TopicRegionKey::from_bytes(&key).unwrap();
761 assert_eq!(topic_key.region_id, RegionId::new(table_id, 0));
762 assert_eq!(topic_key.topic, "topic_0_new");
763 put_found = true;
764 }
765 TxnOp::Get(_) => {
766 panic!("Unexpected Get operation in update transaction");
768 }
769 }
770 }
771 assert!(delete_found, "Expected Delete operation for old topic");
772 assert!(put_found, "Expected Put operation for new topic");
773 }
774
775 #[test]
776 fn test_build_update_txn_no_change() {
777 let kv_backend = Arc::new(MemoryKvBackend::default());
778 let manager = TopicRegionManager::new(kv_backend.clone());
779 let table_id = 1;
780 let region_wal_options = vec![
781 (
782 0,
783 WalOptions::Kafka(KafkaWalOptions {
784 topic: "topic_0".to_string(),
785 }),
786 ),
787 (
788 1,
789 WalOptions::Kafka(KafkaWalOptions {
790 topic: "topic_1".to_string(),
791 }),
792 ),
793 ]
794 .into_iter()
795 .map(|(num, opts)| (num, serde_json::to_string(&opts).unwrap()))
796 .collect::<HashMap<_, _>>();
797 let txn = manager
798 .build_update_txn(table_id, ®ion_wal_options, ®ion_wal_options)
799 .unwrap();
800 let ops = txn.req().success.clone();
802 assert_eq!(ops.len(), 0);
803 }
804
805 #[test]
806 fn test_build_update_txn_mixed_scenarios() {
807 let kv_backend = Arc::new(MemoryKvBackend::default());
808 let manager = TopicRegionManager::new(kv_backend.clone());
809 let table_id = 1;
810 let old_region_wal_options = vec![
811 (
812 0,
813 WalOptions::Kafka(KafkaWalOptions {
814 topic: "topic_0".to_string(),
815 }),
816 ),
817 (
818 1,
819 WalOptions::Kafka(KafkaWalOptions {
820 topic: "topic_1".to_string(),
821 }),
822 ),
823 (
824 2,
825 WalOptions::Kafka(KafkaWalOptions {
826 topic: "topic_2".to_string(),
827 }),
828 ),
829 ]
830 .into_iter()
831 .map(|(num, opts)| (num, serde_json::to_string(&opts).unwrap()))
832 .collect::<HashMap<_, _>>();
833 let new_region_wal_options = vec![
834 (
835 0,
836 WalOptions::Kafka(KafkaWalOptions {
837 topic: "topic_0".to_string(), }),
839 ),
840 (
841 1,
842 WalOptions::Kafka(KafkaWalOptions {
843 topic: "topic_1_new".to_string(), }),
845 ),
846 (
848 3,
849 WalOptions::Kafka(KafkaWalOptions {
850 topic: "topic_3".to_string(), }),
852 ),
853 ]
854 .into_iter()
855 .map(|(num, opts)| (num, serde_json::to_string(&opts).unwrap()))
856 .collect::<HashMap<_, _>>();
857 let txn = manager
858 .build_update_txn(table_id, &old_region_wal_options, &new_region_wal_options)
859 .unwrap();
860
861 let ops = txn.req().success.clone();
862 assert_eq!(ops.len(), 4);
869
870 let mut delete_ops = 0;
871 let mut put_ops = 0;
872 let mut delete_region_2 = false;
873 let mut delete_region_1_old = false;
874 let mut put_region_1_new = false;
875 let mut put_region_3 = false;
876
877 for op in ops {
878 match op {
879 TxnOp::Delete(key) => {
880 delete_ops += 1;
881 let topic_key = TopicRegionKey::from_bytes(&key).unwrap();
882 match topic_key.region_id.region_number() {
883 1 => {
884 assert_eq!(topic_key.topic, "topic_1");
885 delete_region_1_old = true;
886 }
887 2 => {
888 assert_eq!(topic_key.topic, "topic_2");
889 delete_region_2 = true;
890 }
891 _ => panic!("Unexpected delete operation for region"),
892 }
893 }
894 TxnOp::Put(key, _) => {
895 put_ops += 1;
896 let topic_key: TopicRegionKey<'_> = TopicRegionKey::from_bytes(&key).unwrap();
897 match topic_key.region_id.region_number() {
898 1 => {
899 assert_eq!(topic_key.topic, "topic_1_new");
900 put_region_1_new = true;
901 }
902 3 => {
903 assert_eq!(topic_key.topic, "topic_3");
904 put_region_3 = true;
905 }
906 _ => panic!("Unexpected put operation for region"),
907 }
908 }
909 TxnOp::Get(_) => {
910 panic!("Unexpected Get operation in update transaction");
911 }
912 }
913 }
914
915 assert_eq!(delete_ops, 2);
916 assert_eq!(put_ops, 2);
917 assert!(delete_region_2, "Expected delete for removed region 2");
918 assert!(
919 delete_region_1_old,
920 "Expected delete for region 1 old topic"
921 );
922 assert!(put_region_1_new, "Expected put for region 1 new topic");
923 assert!(put_region_3, "Expected put for new region 3");
924 }
925
926 #[test]
927 fn test_build_update_txn_with_raft_engine() {
928 let kv_backend = Arc::new(MemoryKvBackend::default());
929 let manager = TopicRegionManager::new(kv_backend.clone());
930 let table_id = 1;
931 let old_region_wal_options = vec![
932 (
933 0,
934 WalOptions::Kafka(KafkaWalOptions {
935 topic: "topic_0".to_string(),
936 }),
937 ),
938 (1, WalOptions::RaftEngine), ]
940 .into_iter()
941 .map(|(num, opts)| (num, serde_json::to_string(&opts).unwrap()))
942 .collect::<HashMap<_, _>>();
943 let new_region_wal_options = vec![
944 (
945 0,
946 WalOptions::Kafka(KafkaWalOptions {
947 topic: "topic_0".to_string(),
948 }),
949 ),
950 (
951 1,
952 WalOptions::Kafka(KafkaWalOptions {
953 topic: "topic_1".to_string(), }),
955 ),
956 ]
957 .into_iter()
958 .map(|(num, opts)| (num, serde_json::to_string(&opts).unwrap()))
959 .collect::<HashMap<_, _>>();
960 let txn = manager
961 .build_update_txn(table_id, &old_region_wal_options, &new_region_wal_options)
962 .unwrap();
963 let ops = txn.req().success.clone();
964 assert_eq!(ops.len(), 1);
968 match &ops[0] {
969 TxnOp::Put(key, _) => {
970 let topic_key = TopicRegionKey::from_bytes(key).unwrap();
971 assert_eq!(topic_key.region_id, RegionId::new(table_id, 1));
972 assert_eq!(topic_key.topic, "topic_1");
973 }
974 TxnOp::Delete(_) | TxnOp::Get(_) => {
975 panic!("Expected Put operation for new Kafka region");
976 }
977 }
978 }
979}