1use std::collections::HashMap;
16use std::fmt::{self, Display};
17
18use common_wal::options::WalOptions;
19use serde::{Deserialize, Serialize};
20use snafu::OptionExt;
21use store_api::storage::RegionId;
22use table::metadata::TableId;
23
24use crate::error::{Error, InvalidMetadataSnafu, Result};
25use crate::key::{MetadataKey, MetadataValue, TOPIC_REGION_PATTERN, TOPIC_REGION_PREFIX};
26use crate::kv_backend::KvBackendRef;
27use crate::kv_backend::txn::{Txn, TxnOp};
28use crate::rpc::KeyValue;
29use crate::rpc::store::{
30 BatchDeleteRequest, BatchGetRequest, BatchPutRequest, PutRequest, RangeRequest,
31};
32use crate::wal_provider::RegionWalOptions;
33
34#[derive(Debug, Clone, PartialEq, Eq, Hash)]
37pub struct TopicRegionKey<'a> {
38 pub region_id: RegionId,
39 pub topic: &'a str,
40}
41
42#[derive(Debug, Clone, Copy, Serialize, Deserialize, Default)]
44pub struct TopicRegionValue {
45 #[serde(default, skip_serializing_if = "Option::is_none")]
46 pub checkpoint: Option<ReplayCheckpoint>,
47}
48
49#[derive(Debug, Clone, Copy, Serialize, Deserialize, Default, PartialEq, Eq, PartialOrd, Ord)]
50pub struct ReplayCheckpoint {
51 #[serde(default)]
52 pub entry_id: u64,
53 #[serde(default, skip_serializing_if = "Option::is_none")]
54 pub metadata_entry_id: Option<u64>,
55}
56
57impl<'a> TopicRegionKey<'a> {
58 pub fn new(region_id: RegionId, topic: &'a str) -> Self {
59 Self { region_id, topic }
60 }
61
62 pub fn range_topic_key(topic: &str) -> String {
63 format!("{}/{}/", TOPIC_REGION_PREFIX, topic)
64 }
65}
66
67impl<'a> MetadataKey<'a, TopicRegionKey<'a>> for TopicRegionKey<'a> {
68 fn to_bytes(&self) -> Vec<u8> {
69 self.to_string().into_bytes()
70 }
71
72 fn from_bytes(bytes: &'a [u8]) -> Result<TopicRegionKey<'a>> {
73 let key = std::str::from_utf8(bytes).map_err(|e| {
74 InvalidMetadataSnafu {
75 err_msg: format!(
76 "TopicRegionKey '{}' is not a valid UTF8 string: {e}",
77 String::from_utf8_lossy(bytes)
78 ),
79 }
80 .build()
81 })?;
82 TopicRegionKey::try_from(key)
83 }
84}
85
86impl Display for TopicRegionKey<'_> {
87 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
88 write!(
89 f,
90 "{}{}",
91 Self::range_topic_key(self.topic),
92 self.region_id.as_u64()
93 )
94 }
95}
96
97impl<'a> TryFrom<&'a str> for TopicRegionKey<'a> {
98 type Error = Error;
99
100 fn try_from(value: &'a str) -> Result<TopicRegionKey<'a>> {
102 let captures = TOPIC_REGION_PATTERN
103 .captures(value)
104 .context(InvalidMetadataSnafu {
105 err_msg: format!("Invalid TopicRegionKey: {}", value),
106 })?;
107 let topic = captures.get(1).map(|m| m.as_str()).unwrap();
108 let region_id = captures[2].parse::<u64>().map_err(|_| {
109 InvalidMetadataSnafu {
110 err_msg: format!("Invalid region id in TopicRegionKey: {}", value),
111 }
112 .build()
113 })?;
114 Ok(TopicRegionKey {
115 region_id: RegionId::from_u64(region_id),
116 topic,
117 })
118 }
119}
120
121impl ReplayCheckpoint {
122 pub fn new(entry_id: u64, metadata_entry_id: Option<u64>) -> Self {
124 Self {
125 entry_id,
126 metadata_entry_id,
127 }
128 }
129
130 pub fn merge_with_topic_pruned_entry_id(
132 checkpoint: Option<Self>,
133 pruned_entry_id: Option<u64>,
134 is_metric_engine: bool,
135 ) -> Option<Self> {
136 match (checkpoint, pruned_entry_id) {
137 (Some(checkpoint), Some(pruned_entry_id)) => Some(Self {
138 entry_id: checkpoint.entry_id.max(pruned_entry_id),
139 metadata_entry_id: if is_metric_engine {
140 Some(
141 checkpoint
142 .metadata_entry_id
143 .unwrap_or_default()
144 .max(pruned_entry_id),
145 )
146 } else {
147 checkpoint.metadata_entry_id
148 },
149 }),
150 (None, Some(pruned_entry_id)) => Some(Self {
151 entry_id: pruned_entry_id,
152 metadata_entry_id: is_metric_engine.then_some(pruned_entry_id),
153 }),
154 (checkpoint, None) => checkpoint,
155 }
156 }
157}
158
159impl TopicRegionValue {
160 pub fn new(checkpoint: Option<ReplayCheckpoint>) -> Self {
162 Self { checkpoint }
163 }
164
165 pub fn min_entry_id(&self) -> Option<u64> {
169 match self.checkpoint {
170 Some(ReplayCheckpoint {
171 entry_id,
172 metadata_entry_id,
173 }) => match metadata_entry_id {
174 Some(metadata_entry_id) => Some(entry_id.min(metadata_entry_id)),
175 None => Some(entry_id),
176 },
177 None => None,
178 }
179 }
180}
181
182fn topic_region_decoder(value: &KeyValue) -> Result<(TopicRegionKey<'_>, TopicRegionValue)> {
183 let key = TopicRegionKey::from_bytes(&value.key)?;
184 let value = if value.value.is_empty() {
185 TopicRegionValue::default()
186 } else {
187 TopicRegionValue::try_from_raw_value(&value.value)?
188 };
189 Ok((key, value))
190}
191
192pub struct TopicRegionManager {
194 kv_backend: KvBackendRef,
195}
196
197impl TopicRegionManager {
198 pub fn new(kv_backend: KvBackendRef) -> Self {
199 Self { kv_backend }
200 }
201
202 pub async fn put(&self, key: TopicRegionKey<'_>) -> Result<()> {
203 let put_req = PutRequest {
204 key: key.to_bytes(),
205 value: vec![],
206 prev_kv: false,
207 };
208 self.kv_backend.put(put_req).await?;
209 Ok(())
210 }
211
212 pub async fn batch_get(
213 &self,
214 keys: Vec<TopicRegionKey<'_>>,
215 ) -> Result<HashMap<RegionId, TopicRegionValue>> {
216 let raw_keys = keys.iter().map(|key| key.to_bytes()).collect::<Vec<_>>();
217 let req = BatchGetRequest { keys: raw_keys };
218 let resp = self.kv_backend.batch_get(req).await?;
219
220 let v = resp
221 .kvs
222 .into_iter()
223 .map(|kv| topic_region_decoder(&kv).map(|(key, value)| (key.region_id, value)))
224 .collect::<Result<HashMap<_, _>>>()?;
225
226 Ok(v)
227 }
228
229 pub async fn get(&self, key: TopicRegionKey<'_>) -> Result<Option<TopicRegionValue>> {
230 let key_bytes = key.to_bytes();
231 let resp = self.kv_backend.get(&key_bytes).await?;
232 let value = resp
233 .map(|kv| topic_region_decoder(&kv).map(|(_, value)| value))
234 .transpose()?;
235
236 Ok(value)
237 }
238
239 pub async fn batch_put(
240 &self,
241 keys: &[(TopicRegionKey<'_>, Option<TopicRegionValue>)],
242 ) -> Result<()> {
243 let req = BatchPutRequest {
244 kvs: keys
245 .iter()
246 .map(|(key, value)| {
247 let value = value
248 .map(|v| v.try_as_raw_value())
249 .transpose()?
250 .unwrap_or_default();
251
252 Ok(KeyValue {
253 key: key.to_bytes(),
254 value,
255 })
256 })
257 .collect::<Result<Vec<_>>>()?,
258 prev_kv: false,
259 };
260 self.kv_backend.batch_put(req).await?;
261 Ok(())
262 }
263
264 pub fn build_create_txn(
266 &self,
267 table_id: TableId,
268 region_wal_options: &RegionWalOptions,
269 ) -> Result<Txn> {
270 let topic_region_mapping = self.get_topic_region_mapping(table_id, region_wal_options);
271 let topic_region_keys = topic_region_mapping
272 .iter()
273 .map(|(region_id, topic)| TopicRegionKey::new(*region_id, topic))
274 .collect::<Vec<_>>();
275 let operations = topic_region_keys
276 .into_iter()
277 .map(|key| TxnOp::Put(key.to_bytes(), vec![]))
278 .collect::<Vec<_>>();
279 Ok(Txn::new().and_then(operations))
280 }
281
282 pub fn build_update_txn(
284 &self,
285 table_id: TableId,
286 old_region_wal_options: &RegionWalOptions,
287 new_region_wal_options: &RegionWalOptions,
288 ) -> Result<Txn> {
289 let old_mapping = self.get_topic_region_mapping(table_id, old_region_wal_options);
290 let new_mapping = self.get_topic_region_mapping(table_id, new_region_wal_options);
291
292 let old_map: HashMap<RegionId, &str> = old_mapping.into_iter().collect();
294 let new_map: HashMap<RegionId, &str> = new_mapping.into_iter().collect();
295 let mut ops = Vec::new();
296
297 for (region_id, old_topic) in &old_map {
299 match new_map.get(region_id) {
300 Some(new_topic) if *new_topic == *old_topic => {
301 }
303 _ => {
304 let key = TopicRegionKey::new(*region_id, old_topic);
306 ops.push(TxnOp::Delete(key.to_bytes()));
307 }
308 }
309 }
310
311 for (region_id, new_topic) in &new_map {
313 match old_map.get(region_id) {
314 Some(old_topic) if *old_topic == *new_topic => {
315 }
317 _ => {
318 let key = TopicRegionKey::new(*region_id, new_topic);
320 ops.push(TxnOp::Put(key.to_bytes(), vec![]));
322 }
323 }
324 }
325
326 Ok(Txn::new().and_then(ops))
327 }
328
329 pub async fn regions(&self, topic: &str) -> Result<HashMap<RegionId, TopicRegionValue>> {
331 let prefix = TopicRegionKey::range_topic_key(topic);
332 let req = RangeRequest::new().with_prefix(prefix.as_bytes());
333 let resp = self.kv_backend.range(req).await?;
334 let region_ids = resp
335 .kvs
336 .iter()
337 .map(topic_region_decoder)
338 .collect::<Result<Vec<_>>>()?;
339 Ok(region_ids
340 .into_iter()
341 .map(|(key, value)| (key.region_id, value))
342 .collect())
343 }
344
345 pub async fn delete(&self, key: TopicRegionKey<'_>) -> Result<()> {
346 let raw_key = key.to_bytes();
347 self.kv_backend.delete(&raw_key, false).await?;
348 Ok(())
349 }
350
351 pub async fn batch_delete(&self, keys: Vec<TopicRegionKey<'_>>) -> Result<()> {
352 let raw_keys = keys.iter().map(|key| key.to_bytes()).collect::<Vec<_>>();
353 let req = BatchDeleteRequest {
354 keys: raw_keys,
355 prev_kv: false,
356 };
357 self.kv_backend.batch_delete(req).await?;
358 Ok(())
359 }
360
361 pub fn get_topic_region_mapping<'a>(
367 &self,
368 table_id: TableId,
369 region_wal_options: &'a RegionWalOptions,
370 ) -> Vec<(RegionId, &'a str)> {
371 region_wal_options
372 .keys()
373 .filter_map(
374 |region_number| match region_wal_options.get(region_number) {
375 Some(WalOptions::Kafka(kafka)) => {
376 let region_id = RegionId::new(table_id, *region_number);
377 Some((region_id, kafka.topic.as_str()))
378 }
379 Some(WalOptions::RaftEngine) => None,
380 Some(WalOptions::Noop) => None,
381 None => None,
382 },
383 )
384 .collect::<Vec<_>>()
385 }
386}
387
388#[cfg(test)]
389mod tests {
390 use std::sync::Arc;
391
392 use common_wal::options::KafkaWalOptions;
393
394 use super::*;
395 use crate::kv_backend::memory::MemoryKvBackend;
396
397 #[test]
398 fn test_merge_checkpoint_with_topic_pruned_entry_id_missing_pruned() {
399 let checkpoint = Some(ReplayCheckpoint::new(10, None));
400
401 assert_eq!(
402 ReplayCheckpoint::merge_with_topic_pruned_entry_id(checkpoint, None, true),
403 checkpoint
404 );
405 }
406
407 #[test]
408 fn test_merge_checkpoint_with_topic_pruned_entry_id_creates_checkpoint() {
409 assert_eq!(
410 ReplayCheckpoint::merge_with_topic_pruned_entry_id(None, Some(10), true),
411 Some(ReplayCheckpoint::new(10, Some(10)))
412 );
413 }
414
415 #[test]
416 fn test_merge_checkpoint_with_topic_pruned_entry_id_updates_both_ids() {
417 let checkpoint = ReplayCheckpoint::new(10, Some(5));
418
419 assert_eq!(
420 ReplayCheckpoint::merge_with_topic_pruned_entry_id(Some(checkpoint), Some(20), true),
421 Some(ReplayCheckpoint::new(20, Some(20)))
422 );
423 }
424
425 #[test]
426 fn test_merge_checkpoint_with_topic_pruned_entry_id_preserves_larger_ids() {
427 let checkpoint = ReplayCheckpoint::new(30, Some(40));
428
429 assert_eq!(
430 ReplayCheckpoint::merge_with_topic_pruned_entry_id(Some(checkpoint), Some(20), true),
431 Some(checkpoint)
432 );
433 }
434
435 #[test]
436 fn test_merge_checkpoint_with_topic_pruned_entry_id_for_mito() {
437 assert_eq!(
438 ReplayCheckpoint::merge_with_topic_pruned_entry_id(None, Some(10), false),
439 Some(ReplayCheckpoint::new(10, None))
440 );
441
442 let checkpoint = ReplayCheckpoint::new(5, Some(8));
443 assert_eq!(
444 ReplayCheckpoint::merge_with_topic_pruned_entry_id(Some(checkpoint), Some(10), false),
445 Some(ReplayCheckpoint::new(10, Some(8)))
446 );
447 }
448
449 #[tokio::test]
450 async fn test_topic_region_manager() {
451 let kv_backend = Arc::new(MemoryKvBackend::default());
452 let manager = TopicRegionManager::new(kv_backend.clone());
453
454 let topics = (0..16).map(|i| format!("topic_{}", i)).collect::<Vec<_>>();
455 let keys = (0..64)
456 .map(|i| {
457 (
458 TopicRegionKey::new(RegionId::from_u64(i), &topics[(i % 16) as usize]),
459 None,
460 )
461 })
462 .collect::<Vec<_>>();
463
464 manager.batch_put(&keys).await.unwrap();
465 let mut key_values = manager
466 .regions(&topics[0])
467 .await
468 .unwrap()
469 .into_keys()
470 .collect::<Vec<_>>();
471 let expected = keys
472 .iter()
473 .filter_map(|(key, _)| {
474 if key.topic == topics[0] {
475 Some(key.region_id)
476 } else {
477 None
478 }
479 })
480 .collect::<Vec<_>>();
481 key_values.sort_by_key(|id| id.as_u64());
482 assert_eq!(key_values, expected);
483
484 let key = TopicRegionKey::new(RegionId::from_u64(0), "topic_0");
485 manager.delete(key.clone()).await.unwrap();
486 let mut key_values = manager
487 .regions(&topics[0])
488 .await
489 .unwrap()
490 .into_keys()
491 .collect::<Vec<_>>();
492 let expected = keys
493 .iter()
494 .filter_map(|(key, _)| {
495 if key.topic == topics[0] && key.region_id != RegionId::from_u64(0) {
496 Some(key.region_id)
497 } else {
498 None
499 }
500 })
501 .collect::<Vec<_>>();
502 key_values.sort_by_key(|id| id.as_u64());
503 assert_eq!(key_values, expected);
504 }
505
506 #[test]
507 fn test_topic_region_map() {
508 let kv_backend = Arc::new(MemoryKvBackend::default());
509 let manager = TopicRegionManager::new(kv_backend.clone());
510
511 let table_id = 1;
512 let region_wal_options = (0..64)
513 .map(|i| {
514 let region_number = i;
515 let wal_options = if i % 2 == 0 {
516 WalOptions::Kafka(KafkaWalOptions::new(format!("topic_{}", i)))
517 } else {
518 WalOptions::RaftEngine
519 };
520 (region_number, wal_options)
521 })
522 .collect::<HashMap<_, _>>();
523
524 let mut topic_region_mapping =
525 manager.get_topic_region_mapping(table_id, ®ion_wal_options);
526 let mut expected = (0..64)
527 .filter_map(|i| {
528 if i % 2 == 0 {
529 Some((RegionId::new(table_id, i), format!("topic_{}", i)))
530 } else {
531 None
532 }
533 })
534 .collect::<Vec<_>>();
535 topic_region_mapping.sort_by_key(|(region_id, _)| region_id.as_u64());
536 let topic_region_map = topic_region_mapping
537 .iter()
538 .map(|(region_id, topic)| (*region_id, topic.to_string()))
539 .collect::<Vec<_>>();
540 expected.sort_by_key(|(region_id, _)| region_id.as_u64());
541 assert_eq!(topic_region_map, expected);
542 }
543
544 #[test]
545 fn test_topic_region_key_is_match() {
546 let key = "__topic_region/6f153a64-7fac-4cf6-8b0b-a7967dd73879_2/4410931412992";
547 let topic_region_key = TopicRegionKey::try_from(key).unwrap();
548 assert_eq!(
549 topic_region_key.topic,
550 "6f153a64-7fac-4cf6-8b0b-a7967dd73879_2"
551 );
552 assert_eq!(
553 topic_region_key.region_id,
554 RegionId::from_u64(4410931412992)
555 );
556 }
557
558 #[test]
559 fn test_build_create_txn() {
560 let kv_backend = Arc::new(MemoryKvBackend::default());
561 let manager = TopicRegionManager::new(kv_backend.clone());
562 let table_id = 1;
563 let region_wal_options = vec![
564 (
565 0,
566 WalOptions::Kafka(KafkaWalOptions::new("topic_0".to_string())),
567 ),
568 (
569 1,
570 WalOptions::Kafka(KafkaWalOptions::new("topic_1".to_string())),
571 ),
572 (2, WalOptions::RaftEngine), ]
574 .into_iter()
575 .collect::<HashMap<_, _>>();
576
577 let txn = manager
578 .build_create_txn(table_id, ®ion_wal_options)
579 .unwrap();
580
581 let ops = txn.req().success.clone();
584 assert_eq!(ops.len(), 2);
585
586 let keys: Vec<_> = ops
587 .iter()
588 .filter_map(|op| {
589 if let TxnOp::Put(key, _) = op {
590 TopicRegionKey::from_bytes(key).ok()
591 } else {
592 None
593 }
594 })
595 .collect();
596
597 assert_eq!(keys.len(), 2);
598 let region_ids: Vec<_> = keys.iter().map(|k| k.region_id).collect();
599 assert!(region_ids.contains(&RegionId::new(table_id, 0)));
600 assert!(region_ids.contains(&RegionId::new(table_id, 1)));
601 assert!(!region_ids.contains(&RegionId::new(table_id, 2)));
602
603 for key in keys {
605 match key.region_id.region_number() {
606 0 => assert_eq!(key.topic, "topic_0"),
607 1 => assert_eq!(key.topic, "topic_1"),
608 _ => panic!("Unexpected region number"),
609 }
610 }
611 }
612
613 #[test]
614 fn test_build_update_txn_add_new_region() {
615 let kv_backend = Arc::new(MemoryKvBackend::default());
616 let manager = TopicRegionManager::new(kv_backend.clone());
617 let table_id = 1;
618 let old_region_wal_options = vec![(
619 0,
620 WalOptions::Kafka(KafkaWalOptions::new("topic_0".to_string())),
621 )]
622 .into_iter()
623 .collect::<HashMap<_, _>>();
624 let new_region_wal_options = vec![
625 (
626 0,
627 WalOptions::Kafka(KafkaWalOptions::new("topic_0".to_string())),
628 ),
629 (
630 1,
631 WalOptions::Kafka(KafkaWalOptions::new("topic_1".to_string())),
632 ),
633 ]
634 .into_iter()
635 .collect::<HashMap<_, _>>();
636 let txn = manager
637 .build_update_txn(table_id, &old_region_wal_options, &new_region_wal_options)
638 .unwrap();
639 let ops = txn.req().success.clone();
640 assert_eq!(ops.len(), 1);
642 if let TxnOp::Put(key, _) = &ops[0] {
643 let topic_key = TopicRegionKey::from_bytes(key).unwrap();
644 assert_eq!(topic_key.region_id, RegionId::new(table_id, 1));
645 assert_eq!(topic_key.topic, "topic_1");
646 } else {
647 panic!("Expected Put operation");
648 }
649 }
650
651 #[test]
652 fn test_build_update_txn_remove_region() {
653 let kv_backend = Arc::new(MemoryKvBackend::default());
654 let manager = TopicRegionManager::new(kv_backend.clone());
655 let table_id = 1;
656 let old_region_wal_options = vec![
657 (
658 0,
659 WalOptions::Kafka(KafkaWalOptions::new("topic_0".to_string())),
660 ),
661 (
662 1,
663 WalOptions::Kafka(KafkaWalOptions::new("topic_1".to_string())),
664 ),
665 ]
666 .into_iter()
667 .collect::<HashMap<_, _>>();
668 let new_region_wal_options = vec![(
669 0,
670 WalOptions::Kafka(KafkaWalOptions::new("topic_0".to_string())),
671 )]
672 .into_iter()
673 .collect::<HashMap<_, _>>();
674 let txn = manager
675 .build_update_txn(table_id, &old_region_wal_options, &new_region_wal_options)
676 .unwrap();
677 let ops = txn.req().success.clone();
678 assert_eq!(ops.len(), 1);
680 match &ops[0] {
681 TxnOp::Delete(key) => {
682 let topic_key = TopicRegionKey::from_bytes(key).unwrap();
683 assert_eq!(topic_key.region_id, RegionId::new(table_id, 1));
684 assert_eq!(topic_key.topic, "topic_1");
685 }
686 TxnOp::Put(_, _) | TxnOp::Get(_) => {
687 panic!("Expected Delete operation");
688 }
689 }
690 }
691
692 #[test]
693 fn test_build_update_txn_change_topic() {
694 let kv_backend = Arc::new(MemoryKvBackend::default());
695 let manager = TopicRegionManager::new(kv_backend.clone());
696 let table_id = 1;
697 let old_region_wal_options = vec![(
698 0,
699 WalOptions::Kafka(KafkaWalOptions::new("topic_0".to_string())),
700 )]
701 .into_iter()
702 .collect::<HashMap<_, _>>();
703 let new_region_wal_options = vec![(
704 0,
705 WalOptions::Kafka(KafkaWalOptions::new("topic_0_new".to_string())),
706 )]
707 .into_iter()
708 .collect::<HashMap<_, _>>();
709 let txn = manager
710 .build_update_txn(table_id, &old_region_wal_options, &new_region_wal_options)
711 .unwrap();
712 let ops = txn.req().success.clone();
713 assert_eq!(ops.len(), 2);
715
716 let mut delete_found = false;
717 let mut put_found = false;
718 for op in ops {
719 match op {
720 TxnOp::Delete(key) => {
721 let topic_key = TopicRegionKey::from_bytes(&key).unwrap();
722 assert_eq!(topic_key.region_id, RegionId::new(table_id, 0));
723 assert_eq!(topic_key.topic, "topic_0");
724 delete_found = true;
725 }
726 TxnOp::Put(key, _) => {
727 let topic_key = TopicRegionKey::from_bytes(&key).unwrap();
728 assert_eq!(topic_key.region_id, RegionId::new(table_id, 0));
729 assert_eq!(topic_key.topic, "topic_0_new");
730 put_found = true;
731 }
732 TxnOp::Get(_) => {
733 panic!("Unexpected Get operation in update transaction");
735 }
736 }
737 }
738 assert!(delete_found, "Expected Delete operation for old topic");
739 assert!(put_found, "Expected Put operation for new topic");
740 }
741
742 #[test]
743 fn test_build_update_txn_no_change() {
744 let kv_backend = Arc::new(MemoryKvBackend::default());
745 let manager = TopicRegionManager::new(kv_backend.clone());
746 let table_id = 1;
747 let region_wal_options = vec![
748 (
749 0,
750 WalOptions::Kafka(KafkaWalOptions::new("topic_0".to_string())),
751 ),
752 (
753 1,
754 WalOptions::Kafka(KafkaWalOptions::new("topic_1".to_string())),
755 ),
756 ]
757 .into_iter()
758 .collect::<HashMap<_, _>>();
759 let txn = manager
760 .build_update_txn(table_id, ®ion_wal_options, ®ion_wal_options)
761 .unwrap();
762 let ops = txn.req().success.clone();
764 assert_eq!(ops.len(), 0);
765 }
766
767 #[test]
768 fn test_build_update_txn_mixed_scenarios() {
769 let kv_backend = Arc::new(MemoryKvBackend::default());
770 let manager = TopicRegionManager::new(kv_backend.clone());
771 let table_id = 1;
772 let old_region_wal_options = vec![
773 (
774 0,
775 WalOptions::Kafka(KafkaWalOptions::new("topic_0".to_string())),
776 ),
777 (
778 1,
779 WalOptions::Kafka(KafkaWalOptions::new("topic_1".to_string())),
780 ),
781 (
782 2,
783 WalOptions::Kafka(KafkaWalOptions::new("topic_2".to_string())),
784 ),
785 ]
786 .into_iter()
787 .collect::<HashMap<_, _>>();
788 let new_region_wal_options = vec![
789 (
790 0,
791 WalOptions::Kafka(KafkaWalOptions::new("topic_0".to_string())), ),
793 (
794 1,
795 WalOptions::Kafka(KafkaWalOptions::new("topic_1_new".to_string())), ),
797 (
799 3,
800 WalOptions::Kafka(KafkaWalOptions::new("topic_3".to_string())), ),
802 ]
803 .into_iter()
804 .collect::<HashMap<_, _>>();
805 let txn = manager
806 .build_update_txn(table_id, &old_region_wal_options, &new_region_wal_options)
807 .unwrap();
808
809 let ops = txn.req().success.clone();
810 assert_eq!(ops.len(), 4);
817
818 let mut delete_ops = 0;
819 let mut put_ops = 0;
820 let mut delete_region_2 = false;
821 let mut delete_region_1_old = false;
822 let mut put_region_1_new = false;
823 let mut put_region_3 = false;
824
825 for op in ops {
826 match op {
827 TxnOp::Delete(key) => {
828 delete_ops += 1;
829 let topic_key = TopicRegionKey::from_bytes(&key).unwrap();
830 match topic_key.region_id.region_number() {
831 1 => {
832 assert_eq!(topic_key.topic, "topic_1");
833 delete_region_1_old = true;
834 }
835 2 => {
836 assert_eq!(topic_key.topic, "topic_2");
837 delete_region_2 = true;
838 }
839 _ => panic!("Unexpected delete operation for region"),
840 }
841 }
842 TxnOp::Put(key, _) => {
843 put_ops += 1;
844 let topic_key: TopicRegionKey<'_> = TopicRegionKey::from_bytes(&key).unwrap();
845 match topic_key.region_id.region_number() {
846 1 => {
847 assert_eq!(topic_key.topic, "topic_1_new");
848 put_region_1_new = true;
849 }
850 3 => {
851 assert_eq!(topic_key.topic, "topic_3");
852 put_region_3 = true;
853 }
854 _ => panic!("Unexpected put operation for region"),
855 }
856 }
857 TxnOp::Get(_) => {
858 panic!("Unexpected Get operation in update transaction");
859 }
860 }
861 }
862
863 assert_eq!(delete_ops, 2);
864 assert_eq!(put_ops, 2);
865 assert!(delete_region_2, "Expected delete for removed region 2");
866 assert!(
867 delete_region_1_old,
868 "Expected delete for region 1 old topic"
869 );
870 assert!(put_region_1_new, "Expected put for region 1 new topic");
871 assert!(put_region_3, "Expected put for new region 3");
872 }
873
874 #[test]
875 fn test_build_update_txn_with_raft_engine() {
876 let kv_backend = Arc::new(MemoryKvBackend::default());
877 let manager = TopicRegionManager::new(kv_backend.clone());
878 let table_id = 1;
879 let old_region_wal_options = vec![
880 (
881 0,
882 WalOptions::Kafka(KafkaWalOptions::new("topic_0".to_string())),
883 ),
884 (1, WalOptions::RaftEngine), ]
886 .into_iter()
887 .collect::<HashMap<_, _>>();
888 let new_region_wal_options = vec![
889 (
890 0,
891 WalOptions::Kafka(KafkaWalOptions::new("topic_0".to_string())),
892 ),
893 (
894 1,
895 WalOptions::Kafka(KafkaWalOptions::new("topic_1".to_string())), ),
897 ]
898 .into_iter()
899 .collect::<HashMap<_, _>>();
900 let txn = manager
901 .build_update_txn(table_id, &old_region_wal_options, &new_region_wal_options)
902 .unwrap();
903 let ops = txn.req().success.clone();
904 assert_eq!(ops.len(), 1);
908 match &ops[0] {
909 TxnOp::Put(key, _) => {
910 let topic_key = TopicRegionKey::from_bytes(key).unwrap();
911 assert_eq!(topic_key.region_id, RegionId::new(table_id, 1));
912 assert_eq!(topic_key.topic, "topic_1");
913 }
914 TxnOp::Delete(_) | TxnOp::Get(_) => {
915 panic!("Expected Put operation for new Kafka region");
916 }
917 }
918 }
919}