Skip to main content

common_meta/key/
topic_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::fmt::{self, Display};
17
18use common_wal::options::WalOptions;
19use serde::{Deserialize, Serialize};
20use snafu::OptionExt;
21use store_api::storage::{RegionId, RegionNumber};
22use table::metadata::TableId;
23
24use crate::ddl::utils::parse_region_wal_options;
25use crate::error::{Error, InvalidMetadataSnafu, Result};
26use crate::key::{MetadataKey, MetadataValue, TOPIC_REGION_PATTERN, TOPIC_REGION_PREFIX};
27use crate::kv_backend::KvBackendRef;
28use crate::kv_backend::txn::{Txn, TxnOp};
29use crate::rpc::KeyValue;
30use crate::rpc::store::{
31    BatchDeleteRequest, BatchGetRequest, BatchPutRequest, PutRequest, RangeRequest,
32};
33
34// The TopicRegionKey is a key for the topic-region mapping in the kvbackend.
35// The layout of the key is `__topic_region/{topic_name}/{region_id}`.
36#[derive(Debug, Clone, PartialEq, Eq, Hash)]
37pub struct TopicRegionKey<'a> {
38    pub region_id: RegionId,
39    pub topic: &'a str,
40}
41
42/// Represents additional information for a region when using a shared WAL.
43#[derive(Debug, Clone, Copy, Serialize, Deserialize, Default)]
44pub struct TopicRegionValue {
45    #[serde(default, skip_serializing_if = "Option::is_none")]
46    pub checkpoint: Option<ReplayCheckpoint>,
47}
48
49#[derive(Debug, Clone, Copy, Serialize, Deserialize, Default, PartialEq, Eq, PartialOrd, Ord)]
50pub struct ReplayCheckpoint {
51    #[serde(default)]
52    pub entry_id: u64,
53    #[serde(default, skip_serializing_if = "Option::is_none")]
54    pub metadata_entry_id: Option<u64>,
55}
56
57impl<'a> TopicRegionKey<'a> {
58    pub fn new(region_id: RegionId, topic: &'a str) -> Self {
59        Self { region_id, topic }
60    }
61
62    pub fn range_topic_key(topic: &str) -> String {
63        format!("{}/{}/", TOPIC_REGION_PREFIX, topic)
64    }
65}
66
67impl<'a> MetadataKey<'a, TopicRegionKey<'a>> for TopicRegionKey<'a> {
68    fn to_bytes(&self) -> Vec<u8> {
69        self.to_string().into_bytes()
70    }
71
72    fn from_bytes(bytes: &'a [u8]) -> Result<TopicRegionKey<'a>> {
73        let key = std::str::from_utf8(bytes).map_err(|e| {
74            InvalidMetadataSnafu {
75                err_msg: format!(
76                    "TopicRegionKey '{}' is not a valid UTF8 string: {e}",
77                    String::from_utf8_lossy(bytes)
78                ),
79            }
80            .build()
81        })?;
82        TopicRegionKey::try_from(key)
83    }
84}
85
86impl Display for TopicRegionKey<'_> {
87    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
88        write!(
89            f,
90            "{}{}",
91            Self::range_topic_key(self.topic),
92            self.region_id.as_u64()
93        )
94    }
95}
96
97impl<'a> TryFrom<&'a str> for TopicRegionKey<'a> {
98    type Error = Error;
99
100    /// Value is of format `{prefix}/{topic}/{region_id}`
101    fn try_from(value: &'a str) -> Result<TopicRegionKey<'a>> {
102        let captures = TOPIC_REGION_PATTERN
103            .captures(value)
104            .context(InvalidMetadataSnafu {
105                err_msg: format!("Invalid TopicRegionKey: {}", value),
106            })?;
107        let topic = captures.get(1).map(|m| m.as_str()).unwrap();
108        let region_id = captures[2].parse::<u64>().map_err(|_| {
109            InvalidMetadataSnafu {
110                err_msg: format!("Invalid region id in TopicRegionKey: {}", value),
111            }
112            .build()
113        })?;
114        Ok(TopicRegionKey {
115            region_id: RegionId::from_u64(region_id),
116            topic,
117        })
118    }
119}
120
121impl ReplayCheckpoint {
122    /// Creates a new [`ReplayCheckpoint`] with the given entry id and metadata entry id.
123    pub fn new(entry_id: u64, metadata_entry_id: Option<u64>) -> Self {
124        Self {
125            entry_id,
126            metadata_entry_id,
127        }
128    }
129
130    /// Merges the checkpoint with the topic pruned entry id.
131    pub fn merge_with_topic_pruned_entry_id(
132        checkpoint: Option<Self>,
133        pruned_entry_id: Option<u64>,
134        is_metric_engine: bool,
135    ) -> Option<Self> {
136        match (checkpoint, pruned_entry_id) {
137            (Some(checkpoint), Some(pruned_entry_id)) => Some(Self {
138                entry_id: checkpoint.entry_id.max(pruned_entry_id),
139                metadata_entry_id: if is_metric_engine {
140                    Some(
141                        checkpoint
142                            .metadata_entry_id
143                            .unwrap_or_default()
144                            .max(pruned_entry_id),
145                    )
146                } else {
147                    checkpoint.metadata_entry_id
148                },
149            }),
150            (None, Some(pruned_entry_id)) => Some(Self {
151                entry_id: pruned_entry_id,
152                metadata_entry_id: is_metric_engine.then_some(pruned_entry_id),
153            }),
154            (checkpoint, None) => checkpoint,
155        }
156    }
157}
158
159impl TopicRegionValue {
160    /// Creates a new [`TopicRegionValue`] with the given checkpoint.
161    pub fn new(checkpoint: Option<ReplayCheckpoint>) -> Self {
162        Self { checkpoint }
163    }
164
165    /// Returns the minimum entry id of the region.
166    ///
167    /// If the metadata entry id is not set, it returns the entry id.
168    pub fn min_entry_id(&self) -> Option<u64> {
169        match self.checkpoint {
170            Some(ReplayCheckpoint {
171                entry_id,
172                metadata_entry_id,
173            }) => match metadata_entry_id {
174                Some(metadata_entry_id) => Some(entry_id.min(metadata_entry_id)),
175                None => Some(entry_id),
176            },
177            None => None,
178        }
179    }
180}
181
182fn topic_region_decoder(value: &KeyValue) -> Result<(TopicRegionKey<'_>, TopicRegionValue)> {
183    let key = TopicRegionKey::from_bytes(&value.key)?;
184    let value = if value.value.is_empty() {
185        TopicRegionValue::default()
186    } else {
187        TopicRegionValue::try_from_raw_value(&value.value)?
188    };
189    Ok((key, value))
190}
191
192/// Manages map of topics and regions in kvbackend.
193pub struct TopicRegionManager {
194    kv_backend: KvBackendRef,
195}
196
197impl TopicRegionManager {
198    pub fn new(kv_backend: KvBackendRef) -> Self {
199        Self { kv_backend }
200    }
201
202    pub async fn put(&self, key: TopicRegionKey<'_>) -> Result<()> {
203        let put_req = PutRequest {
204            key: key.to_bytes(),
205            value: vec![],
206            prev_kv: false,
207        };
208        self.kv_backend.put(put_req).await?;
209        Ok(())
210    }
211
212    pub async fn batch_get(
213        &self,
214        keys: Vec<TopicRegionKey<'_>>,
215    ) -> Result<HashMap<RegionId, TopicRegionValue>> {
216        let raw_keys = keys.iter().map(|key| key.to_bytes()).collect::<Vec<_>>();
217        let req = BatchGetRequest { keys: raw_keys };
218        let resp = self.kv_backend.batch_get(req).await?;
219
220        let v = resp
221            .kvs
222            .into_iter()
223            .map(|kv| topic_region_decoder(&kv).map(|(key, value)| (key.region_id, value)))
224            .collect::<Result<HashMap<_, _>>>()?;
225
226        Ok(v)
227    }
228
229    pub async fn get(&self, key: TopicRegionKey<'_>) -> Result<Option<TopicRegionValue>> {
230        let key_bytes = key.to_bytes();
231        let resp = self.kv_backend.get(&key_bytes).await?;
232        let value = resp
233            .map(|kv| topic_region_decoder(&kv).map(|(_, value)| value))
234            .transpose()?;
235
236        Ok(value)
237    }
238
239    pub async fn batch_put(
240        &self,
241        keys: &[(TopicRegionKey<'_>, Option<TopicRegionValue>)],
242    ) -> Result<()> {
243        let req = BatchPutRequest {
244            kvs: keys
245                .iter()
246                .map(|(key, value)| {
247                    let value = value
248                        .map(|v| v.try_as_raw_value())
249                        .transpose()?
250                        .unwrap_or_default();
251
252                    Ok(KeyValue {
253                        key: key.to_bytes(),
254                        value,
255                    })
256                })
257                .collect::<Result<Vec<_>>>()?,
258            prev_kv: false,
259        };
260        self.kv_backend.batch_put(req).await?;
261        Ok(())
262    }
263
264    /// Build a create topic region mapping transaction. It only executes while the primary keys comparing successes.
265    pub fn build_create_txn(
266        &self,
267        table_id: TableId,
268        region_wal_options: &HashMap<RegionNumber, String>,
269    ) -> Result<Txn> {
270        let region_wal_options = parse_region_wal_options(region_wal_options)?;
271        let topic_region_mapping = self.get_topic_region_mapping(table_id, &region_wal_options);
272        let topic_region_keys = topic_region_mapping
273            .iter()
274            .map(|(region_id, topic)| TopicRegionKey::new(*region_id, topic))
275            .collect::<Vec<_>>();
276        let operations = topic_region_keys
277            .into_iter()
278            .map(|key| TxnOp::Put(key.to_bytes(), vec![]))
279            .collect::<Vec<_>>();
280        Ok(Txn::new().and_then(operations))
281    }
282
283    /// Build a update topic region mapping transaction.
284    pub fn build_update_txn(
285        &self,
286        table_id: TableId,
287        old_region_wal_options: &HashMap<RegionNumber, String>,
288        new_region_wal_options: &HashMap<RegionNumber, String>,
289    ) -> Result<Txn> {
290        let old_wal_options_parsed = parse_region_wal_options(old_region_wal_options)?;
291        let new_wal_options_parsed = parse_region_wal_options(new_region_wal_options)?;
292        let old_mapping = self.get_topic_region_mapping(table_id, &old_wal_options_parsed);
293        let new_mapping = self.get_topic_region_mapping(table_id, &new_wal_options_parsed);
294
295        // Convert to HashMap for easier lookup: RegionId -> Topic
296        let old_map: HashMap<RegionId, &str> = old_mapping.into_iter().collect();
297        let new_map: HashMap<RegionId, &str> = new_mapping.into_iter().collect();
298        let mut ops = Vec::new();
299
300        // Check for deletes (in old but not in new, or topic changed)
301        for (region_id, old_topic) in &old_map {
302            match new_map.get(region_id) {
303                Some(new_topic) if *new_topic == *old_topic => {
304                    // Same topic, do nothing (preserve checkpoint)
305                }
306                _ => {
307                    // Removed or topic changed -> Delete old
308                    let key = TopicRegionKey::new(*region_id, old_topic);
309                    ops.push(TxnOp::Delete(key.to_bytes()));
310                }
311            }
312        }
313
314        // Check for adds (in new but not in old, or topic changed)
315        for (region_id, new_topic) in &new_map {
316            match old_map.get(region_id) {
317                Some(old_topic) if *old_topic == *new_topic => {
318                    // Same topic, already handled (do nothing)
319                }
320                _ => {
321                    // New or topic changed -> Put new
322                    let key = TopicRegionKey::new(*region_id, new_topic);
323                    // Initialize with empty value (default TopicRegionValue)
324                    ops.push(TxnOp::Put(key.to_bytes(), vec![]));
325                }
326            }
327        }
328
329        Ok(Txn::new().and_then(ops))
330    }
331
332    /// Returns the map of [`RegionId`] to their corresponding topic [`TopicRegionValue`].
333    pub async fn regions(&self, topic: &str) -> Result<HashMap<RegionId, TopicRegionValue>> {
334        let prefix = TopicRegionKey::range_topic_key(topic);
335        let req = RangeRequest::new().with_prefix(prefix.as_bytes());
336        let resp = self.kv_backend.range(req).await?;
337        let region_ids = resp
338            .kvs
339            .iter()
340            .map(topic_region_decoder)
341            .collect::<Result<Vec<_>>>()?;
342        Ok(region_ids
343            .into_iter()
344            .map(|(key, value)| (key.region_id, value))
345            .collect())
346    }
347
348    pub async fn delete(&self, key: TopicRegionKey<'_>) -> Result<()> {
349        let raw_key = key.to_bytes();
350        self.kv_backend.delete(&raw_key, false).await?;
351        Ok(())
352    }
353
354    pub async fn batch_delete(&self, keys: Vec<TopicRegionKey<'_>>) -> Result<()> {
355        let raw_keys = keys.iter().map(|key| key.to_bytes()).collect::<Vec<_>>();
356        let req = BatchDeleteRequest {
357            keys: raw_keys,
358            prev_kv: false,
359        };
360        self.kv_backend.batch_delete(req).await?;
361        Ok(())
362    }
363
364    /// Retrieves a mapping of [`RegionId`]s to their corresponding topics name
365    /// based on the provided table ID and WAL options.
366    ///
367    /// # Returns
368    /// A vector of tuples, where each tuple contains a [`RegionId`] and its corresponding topic name.
369    pub fn get_topic_region_mapping<'a>(
370        &self,
371        table_id: TableId,
372        region_wal_options: &'a HashMap<RegionNumber, WalOptions>,
373    ) -> Vec<(RegionId, &'a str)> {
374        region_wal_options
375            .keys()
376            .filter_map(
377                |region_number| match region_wal_options.get(region_number) {
378                    Some(WalOptions::Kafka(kafka)) => {
379                        let region_id = RegionId::new(table_id, *region_number);
380                        Some((region_id, kafka.topic.as_str()))
381                    }
382                    Some(WalOptions::RaftEngine) => None,
383                    Some(WalOptions::Noop) => None,
384                    None => None,
385                },
386            )
387            .collect::<Vec<_>>()
388    }
389}
390
391#[cfg(test)]
392mod tests {
393    use std::sync::Arc;
394
395    use common_wal::options::KafkaWalOptions;
396
397    use super::*;
398    use crate::kv_backend::memory::MemoryKvBackend;
399
400    #[test]
401    fn test_merge_checkpoint_with_topic_pruned_entry_id_missing_pruned() {
402        let checkpoint = Some(ReplayCheckpoint::new(10, None));
403
404        assert_eq!(
405            ReplayCheckpoint::merge_with_topic_pruned_entry_id(checkpoint, None, true),
406            checkpoint
407        );
408    }
409
410    #[test]
411    fn test_merge_checkpoint_with_topic_pruned_entry_id_creates_checkpoint() {
412        assert_eq!(
413            ReplayCheckpoint::merge_with_topic_pruned_entry_id(None, Some(10), true),
414            Some(ReplayCheckpoint::new(10, Some(10)))
415        );
416    }
417
418    #[test]
419    fn test_merge_checkpoint_with_topic_pruned_entry_id_updates_both_ids() {
420        let checkpoint = ReplayCheckpoint::new(10, Some(5));
421
422        assert_eq!(
423            ReplayCheckpoint::merge_with_topic_pruned_entry_id(Some(checkpoint), Some(20), true),
424            Some(ReplayCheckpoint::new(20, Some(20)))
425        );
426    }
427
428    #[test]
429    fn test_merge_checkpoint_with_topic_pruned_entry_id_preserves_larger_ids() {
430        let checkpoint = ReplayCheckpoint::new(30, Some(40));
431
432        assert_eq!(
433            ReplayCheckpoint::merge_with_topic_pruned_entry_id(Some(checkpoint), Some(20), true),
434            Some(checkpoint)
435        );
436    }
437
438    #[test]
439    fn test_merge_checkpoint_with_topic_pruned_entry_id_for_mito() {
440        assert_eq!(
441            ReplayCheckpoint::merge_with_topic_pruned_entry_id(None, Some(10), false),
442            Some(ReplayCheckpoint::new(10, None))
443        );
444
445        let checkpoint = ReplayCheckpoint::new(5, Some(8));
446        assert_eq!(
447            ReplayCheckpoint::merge_with_topic_pruned_entry_id(Some(checkpoint), Some(10), false),
448            Some(ReplayCheckpoint::new(10, Some(8)))
449        );
450    }
451
452    #[tokio::test]
453    async fn test_topic_region_manager() {
454        let kv_backend = Arc::new(MemoryKvBackend::default());
455        let manager = TopicRegionManager::new(kv_backend.clone());
456
457        let topics = (0..16).map(|i| format!("topic_{}", i)).collect::<Vec<_>>();
458        let keys = (0..64)
459            .map(|i| {
460                (
461                    TopicRegionKey::new(RegionId::from_u64(i), &topics[(i % 16) as usize]),
462                    None,
463                )
464            })
465            .collect::<Vec<_>>();
466
467        manager.batch_put(&keys).await.unwrap();
468        let mut key_values = manager
469            .regions(&topics[0])
470            .await
471            .unwrap()
472            .into_keys()
473            .collect::<Vec<_>>();
474        let expected = keys
475            .iter()
476            .filter_map(|(key, _)| {
477                if key.topic == topics[0] {
478                    Some(key.region_id)
479                } else {
480                    None
481                }
482            })
483            .collect::<Vec<_>>();
484        key_values.sort_by_key(|id| id.as_u64());
485        assert_eq!(key_values, expected);
486
487        let key = TopicRegionKey::new(RegionId::from_u64(0), "topic_0");
488        manager.delete(key.clone()).await.unwrap();
489        let mut key_values = manager
490            .regions(&topics[0])
491            .await
492            .unwrap()
493            .into_keys()
494            .collect::<Vec<_>>();
495        let expected = keys
496            .iter()
497            .filter_map(|(key, _)| {
498                if key.topic == topics[0] && key.region_id != RegionId::from_u64(0) {
499                    Some(key.region_id)
500                } else {
501                    None
502                }
503            })
504            .collect::<Vec<_>>();
505        key_values.sort_by_key(|id| id.as_u64());
506        assert_eq!(key_values, expected);
507    }
508
509    #[test]
510    fn test_topic_region_map() {
511        let kv_backend = Arc::new(MemoryKvBackend::default());
512        let manager = TopicRegionManager::new(kv_backend.clone());
513
514        let table_id = 1;
515        let region_wal_options = (0..64)
516            .map(|i| {
517                let region_number = i;
518                let wal_options = if i % 2 == 0 {
519                    WalOptions::Kafka(KafkaWalOptions {
520                        topic: format!("topic_{}", i),
521                    })
522                } else {
523                    WalOptions::RaftEngine
524                };
525                (region_number, serde_json::to_string(&wal_options).unwrap())
526            })
527            .collect::<HashMap<_, _>>();
528
529        let region_wal_options = parse_region_wal_options(&region_wal_options).unwrap();
530        let mut topic_region_mapping =
531            manager.get_topic_region_mapping(table_id, &region_wal_options);
532        let mut expected = (0..64)
533            .filter_map(|i| {
534                if i % 2 == 0 {
535                    Some((RegionId::new(table_id, i), format!("topic_{}", i)))
536                } else {
537                    None
538                }
539            })
540            .collect::<Vec<_>>();
541        topic_region_mapping.sort_by_key(|(region_id, _)| region_id.as_u64());
542        let topic_region_map = topic_region_mapping
543            .iter()
544            .map(|(region_id, topic)| (*region_id, topic.to_string()))
545            .collect::<Vec<_>>();
546        expected.sort_by_key(|(region_id, _)| region_id.as_u64());
547        assert_eq!(topic_region_map, expected);
548    }
549
550    #[test]
551    fn test_topic_region_key_is_match() {
552        let key = "__topic_region/6f153a64-7fac-4cf6-8b0b-a7967dd73879_2/4410931412992";
553        let topic_region_key = TopicRegionKey::try_from(key).unwrap();
554        assert_eq!(
555            topic_region_key.topic,
556            "6f153a64-7fac-4cf6-8b0b-a7967dd73879_2"
557        );
558        assert_eq!(
559            topic_region_key.region_id,
560            RegionId::from_u64(4410931412992)
561        );
562    }
563
564    #[test]
565    fn test_build_create_txn() {
566        let kv_backend = Arc::new(MemoryKvBackend::default());
567        let manager = TopicRegionManager::new(kv_backend.clone());
568        let table_id = 1;
569        let region_wal_options = vec![
570            (
571                0,
572                WalOptions::Kafka(KafkaWalOptions {
573                    topic: "topic_0".to_string(),
574                }),
575            ),
576            (
577                1,
578                WalOptions::Kafka(KafkaWalOptions {
579                    topic: "topic_1".to_string(),
580                }),
581            ),
582            (2, WalOptions::RaftEngine), // Should be ignored
583        ]
584        .into_iter()
585        .map(|(num, opts)| (num, serde_json::to_string(&opts).unwrap()))
586        .collect::<HashMap<_, _>>();
587
588        let txn = manager
589            .build_create_txn(table_id, &region_wal_options)
590            .unwrap();
591
592        // Verify the transaction contains correct operations
593        // Should create mappings for region 0 and 1, but not region 2 (RaftEngine)
594        let ops = txn.req().success.clone();
595        assert_eq!(ops.len(), 2);
596
597        let keys: Vec<_> = ops
598            .iter()
599            .filter_map(|op| {
600                if let TxnOp::Put(key, _) = op {
601                    TopicRegionKey::from_bytes(key).ok()
602                } else {
603                    None
604                }
605            })
606            .collect();
607
608        assert_eq!(keys.len(), 2);
609        let region_ids: Vec<_> = keys.iter().map(|k| k.region_id).collect();
610        assert!(region_ids.contains(&RegionId::new(table_id, 0)));
611        assert!(region_ids.contains(&RegionId::new(table_id, 1)));
612        assert!(!region_ids.contains(&RegionId::new(table_id, 2)));
613
614        // Verify topics are correct
615        for key in keys {
616            match key.region_id.region_number() {
617                0 => assert_eq!(key.topic, "topic_0"),
618                1 => assert_eq!(key.topic, "topic_1"),
619                _ => panic!("Unexpected region number"),
620            }
621        }
622    }
623
624    #[test]
625    fn test_build_update_txn_add_new_region() {
626        let kv_backend = Arc::new(MemoryKvBackend::default());
627        let manager = TopicRegionManager::new(kv_backend.clone());
628        let table_id = 1;
629        let old_region_wal_options = vec![(
630            0,
631            WalOptions::Kafka(KafkaWalOptions {
632                topic: "topic_0".to_string(),
633            }),
634        )]
635        .into_iter()
636        .map(|(num, opts)| (num, serde_json::to_string(&opts).unwrap()))
637        .collect::<HashMap<_, _>>();
638        let new_region_wal_options = vec![
639            (
640                0,
641                WalOptions::Kafka(KafkaWalOptions {
642                    topic: "topic_0".to_string(),
643                }),
644            ),
645            (
646                1,
647                WalOptions::Kafka(KafkaWalOptions {
648                    topic: "topic_1".to_string(),
649                }),
650            ),
651        ]
652        .into_iter()
653        .map(|(num, opts)| (num, serde_json::to_string(&opts).unwrap()))
654        .collect::<HashMap<_, _>>();
655        let txn = manager
656            .build_update_txn(table_id, &old_region_wal_options, &new_region_wal_options)
657            .unwrap();
658        let ops = txn.req().success.clone();
659        // Should only have Put for new region 1 (region 0 unchanged)
660        assert_eq!(ops.len(), 1);
661        if let TxnOp::Put(key, _) = &ops[0] {
662            let topic_key = TopicRegionKey::from_bytes(key).unwrap();
663            assert_eq!(topic_key.region_id, RegionId::new(table_id, 1));
664            assert_eq!(topic_key.topic, "topic_1");
665        } else {
666            panic!("Expected Put operation");
667        }
668    }
669
670    #[test]
671    fn test_build_update_txn_remove_region() {
672        let kv_backend = Arc::new(MemoryKvBackend::default());
673        let manager = TopicRegionManager::new(kv_backend.clone());
674        let table_id = 1;
675        let old_region_wal_options = vec![
676            (
677                0,
678                WalOptions::Kafka(KafkaWalOptions {
679                    topic: "topic_0".to_string(),
680                }),
681            ),
682            (
683                1,
684                WalOptions::Kafka(KafkaWalOptions {
685                    topic: "topic_1".to_string(),
686                }),
687            ),
688        ]
689        .into_iter()
690        .map(|(num, opts)| (num, serde_json::to_string(&opts).unwrap()))
691        .collect::<HashMap<_, _>>();
692        let new_region_wal_options = vec![(
693            0,
694            WalOptions::Kafka(KafkaWalOptions {
695                topic: "topic_0".to_string(),
696            }),
697        )]
698        .into_iter()
699        .map(|(num, opts)| (num, serde_json::to_string(&opts).unwrap()))
700        .collect::<HashMap<_, _>>();
701        let txn = manager
702            .build_update_txn(table_id, &old_region_wal_options, &new_region_wal_options)
703            .unwrap();
704        let ops = txn.req().success.clone();
705        // Should only have Delete for removed region 1 (region 0 unchanged)
706        assert_eq!(ops.len(), 1);
707        match &ops[0] {
708            TxnOp::Delete(key) => {
709                let topic_key = TopicRegionKey::from_bytes(key).unwrap();
710                assert_eq!(topic_key.region_id, RegionId::new(table_id, 1));
711                assert_eq!(topic_key.topic, "topic_1");
712            }
713            TxnOp::Put(_, _) | TxnOp::Get(_) => {
714                panic!("Expected Delete operation");
715            }
716        }
717    }
718
719    #[test]
720    fn test_build_update_txn_change_topic() {
721        let kv_backend = Arc::new(MemoryKvBackend::default());
722        let manager = TopicRegionManager::new(kv_backend.clone());
723        let table_id = 1;
724        let old_region_wal_options = vec![(
725            0,
726            WalOptions::Kafka(KafkaWalOptions {
727                topic: "topic_0".to_string(),
728            }),
729        )]
730        .into_iter()
731        .map(|(num, opts)| (num, serde_json::to_string(&opts).unwrap()))
732        .collect::<HashMap<_, _>>();
733        let new_region_wal_options = vec![(
734            0,
735            WalOptions::Kafka(KafkaWalOptions {
736                topic: "topic_0_new".to_string(),
737            }),
738        )]
739        .into_iter()
740        .map(|(num, opts)| (num, serde_json::to_string(&opts).unwrap()))
741        .collect::<HashMap<_, _>>();
742        let txn = manager
743            .build_update_txn(table_id, &old_region_wal_options, &new_region_wal_options)
744            .unwrap();
745        let ops = txn.req().success.clone();
746        // Should have Delete for old topic and Put for new topic
747        assert_eq!(ops.len(), 2);
748
749        let mut delete_found = false;
750        let mut put_found = false;
751        for op in ops {
752            match op {
753                TxnOp::Delete(key) => {
754                    let topic_key = TopicRegionKey::from_bytes(&key).unwrap();
755                    assert_eq!(topic_key.region_id, RegionId::new(table_id, 0));
756                    assert_eq!(topic_key.topic, "topic_0");
757                    delete_found = true;
758                }
759                TxnOp::Put(key, _) => {
760                    let topic_key = TopicRegionKey::from_bytes(&key).unwrap();
761                    assert_eq!(topic_key.region_id, RegionId::new(table_id, 0));
762                    assert_eq!(topic_key.topic, "topic_0_new");
763                    put_found = true;
764                }
765                TxnOp::Get(_) => {
766                    // Get operations shouldn't appear in this context
767                    panic!("Unexpected Get operation in update transaction");
768                }
769            }
770        }
771        assert!(delete_found, "Expected Delete operation for old topic");
772        assert!(put_found, "Expected Put operation for new topic");
773    }
774
775    #[test]
776    fn test_build_update_txn_no_change() {
777        let kv_backend = Arc::new(MemoryKvBackend::default());
778        let manager = TopicRegionManager::new(kv_backend.clone());
779        let table_id = 1;
780        let region_wal_options = vec![
781            (
782                0,
783                WalOptions::Kafka(KafkaWalOptions {
784                    topic: "topic_0".to_string(),
785                }),
786            ),
787            (
788                1,
789                WalOptions::Kafka(KafkaWalOptions {
790                    topic: "topic_1".to_string(),
791                }),
792            ),
793        ]
794        .into_iter()
795        .map(|(num, opts)| (num, serde_json::to_string(&opts).unwrap()))
796        .collect::<HashMap<_, _>>();
797        let txn = manager
798            .build_update_txn(table_id, &region_wal_options, &region_wal_options)
799            .unwrap();
800        // Should have no operations when nothing changes (preserves checkpoint)
801        let ops = txn.req().success.clone();
802        assert_eq!(ops.len(), 0);
803    }
804
805    #[test]
806    fn test_build_update_txn_mixed_scenarios() {
807        let kv_backend = Arc::new(MemoryKvBackend::default());
808        let manager = TopicRegionManager::new(kv_backend.clone());
809        let table_id = 1;
810        let old_region_wal_options = vec![
811            (
812                0,
813                WalOptions::Kafka(KafkaWalOptions {
814                    topic: "topic_0".to_string(),
815                }),
816            ),
817            (
818                1,
819                WalOptions::Kafka(KafkaWalOptions {
820                    topic: "topic_1".to_string(),
821                }),
822            ),
823            (
824                2,
825                WalOptions::Kafka(KafkaWalOptions {
826                    topic: "topic_2".to_string(),
827                }),
828            ),
829        ]
830        .into_iter()
831        .map(|(num, opts)| (num, serde_json::to_string(&opts).unwrap()))
832        .collect::<HashMap<_, _>>();
833        let new_region_wal_options = vec![
834            (
835                0,
836                WalOptions::Kafka(KafkaWalOptions {
837                    topic: "topic_0".to_string(), // Unchanged
838                }),
839            ),
840            (
841                1,
842                WalOptions::Kafka(KafkaWalOptions {
843                    topic: "topic_1_new".to_string(), // Topic changed
844                }),
845            ),
846            // Region 2 removed
847            (
848                3,
849                WalOptions::Kafka(KafkaWalOptions {
850                    topic: "topic_3".to_string(), // New region
851                }),
852            ),
853        ]
854        .into_iter()
855        .map(|(num, opts)| (num, serde_json::to_string(&opts).unwrap()))
856        .collect::<HashMap<_, _>>();
857        let txn = manager
858            .build_update_txn(table_id, &old_region_wal_options, &new_region_wal_options)
859            .unwrap();
860
861        let ops = txn.req().success.clone();
862        // Should have:
863        // - Delete for region 2 (removed)
864        // - Delete for region 1 old topic (topic changed)
865        // - Put for region 1 new topic (topic changed)
866        // - Put for region 3 (new)
867        // Region 0 unchanged, so no operation
868        assert_eq!(ops.len(), 4);
869
870        let mut delete_ops = 0;
871        let mut put_ops = 0;
872        let mut delete_region_2 = false;
873        let mut delete_region_1_old = false;
874        let mut put_region_1_new = false;
875        let mut put_region_3 = false;
876
877        for op in ops {
878            match op {
879                TxnOp::Delete(key) => {
880                    delete_ops += 1;
881                    let topic_key = TopicRegionKey::from_bytes(&key).unwrap();
882                    match topic_key.region_id.region_number() {
883                        1 => {
884                            assert_eq!(topic_key.topic, "topic_1");
885                            delete_region_1_old = true;
886                        }
887                        2 => {
888                            assert_eq!(topic_key.topic, "topic_2");
889                            delete_region_2 = true;
890                        }
891                        _ => panic!("Unexpected delete operation for region"),
892                    }
893                }
894                TxnOp::Put(key, _) => {
895                    put_ops += 1;
896                    let topic_key: TopicRegionKey<'_> = TopicRegionKey::from_bytes(&key).unwrap();
897                    match topic_key.region_id.region_number() {
898                        1 => {
899                            assert_eq!(topic_key.topic, "topic_1_new");
900                            put_region_1_new = true;
901                        }
902                        3 => {
903                            assert_eq!(topic_key.topic, "topic_3");
904                            put_region_3 = true;
905                        }
906                        _ => panic!("Unexpected put operation for region"),
907                    }
908                }
909                TxnOp::Get(_) => {
910                    panic!("Unexpected Get operation in update transaction");
911                }
912            }
913        }
914
915        assert_eq!(delete_ops, 2);
916        assert_eq!(put_ops, 2);
917        assert!(delete_region_2, "Expected delete for removed region 2");
918        assert!(
919            delete_region_1_old,
920            "Expected delete for region 1 old topic"
921        );
922        assert!(put_region_1_new, "Expected put for region 1 new topic");
923        assert!(put_region_3, "Expected put for new region 3");
924    }
925
926    #[test]
927    fn test_build_update_txn_with_raft_engine() {
928        let kv_backend = Arc::new(MemoryKvBackend::default());
929        let manager = TopicRegionManager::new(kv_backend.clone());
930        let table_id = 1;
931        let old_region_wal_options = vec![
932            (
933                0,
934                WalOptions::Kafka(KafkaWalOptions {
935                    topic: "topic_0".to_string(),
936                }),
937            ),
938            (1, WalOptions::RaftEngine), // Should be ignored
939        ]
940        .into_iter()
941        .map(|(num, opts)| (num, serde_json::to_string(&opts).unwrap()))
942        .collect::<HashMap<_, _>>();
943        let new_region_wal_options = vec![
944            (
945                0,
946                WalOptions::Kafka(KafkaWalOptions {
947                    topic: "topic_0".to_string(),
948                }),
949            ),
950            (
951                1,
952                WalOptions::Kafka(KafkaWalOptions {
953                    topic: "topic_1".to_string(), // Changed from RaftEngine to Kafka
954                }),
955            ),
956        ]
957        .into_iter()
958        .map(|(num, opts)| (num, serde_json::to_string(&opts).unwrap()))
959        .collect::<HashMap<_, _>>();
960        let txn = manager
961            .build_update_txn(table_id, &old_region_wal_options, &new_region_wal_options)
962            .unwrap();
963        let ops = txn.req().success.clone();
964        // Should only have Put for region 1 (new Kafka topic)
965        // Region 0 unchanged, so no operation
966        // Region 1 was RaftEngine before (not tracked), so only Put needed
967        assert_eq!(ops.len(), 1);
968        match &ops[0] {
969            TxnOp::Put(key, _) => {
970                let topic_key = TopicRegionKey::from_bytes(key).unwrap();
971                assert_eq!(topic_key.region_id, RegionId::new(table_id, 1));
972                assert_eq!(topic_key.topic, "topic_1");
973            }
974            TxnOp::Delete(_) | TxnOp::Get(_) => {
975                panic!("Expected Put operation for new Kafka region");
976            }
977        }
978    }
979}