common_meta/key/
topic_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::fmt::{self, Display};
17
18use common_wal::options::WalOptions;
19use serde::{Deserialize, Serialize};
20use snafu::OptionExt;
21use store_api::storage::{RegionId, RegionNumber};
22use table::metadata::TableId;
23
24use crate::ddl::utils::parse_region_wal_options;
25use crate::error::{Error, InvalidMetadataSnafu, Result};
26use crate::key::{MetadataKey, MetadataValue, TOPIC_REGION_PATTERN, TOPIC_REGION_PREFIX};
27use crate::kv_backend::KvBackendRef;
28use crate::kv_backend::txn::{Txn, TxnOp};
29use crate::rpc::KeyValue;
30use crate::rpc::store::{
31    BatchDeleteRequest, BatchGetRequest, BatchPutRequest, PutRequest, RangeRequest,
32};
33
34// The TopicRegionKey is a key for the topic-region mapping in the kvbackend.
35// The layout of the key is `__topic_region/{topic_name}/{region_id}`.
36#[derive(Debug, Clone, PartialEq, Eq, Hash)]
37pub struct TopicRegionKey<'a> {
38    pub region_id: RegionId,
39    pub topic: &'a str,
40}
41
42/// Represents additional information for a region when using a shared WAL.
43#[derive(Debug, Clone, Copy, Serialize, Deserialize, Default)]
44pub struct TopicRegionValue {
45    #[serde(default, skip_serializing_if = "Option::is_none")]
46    pub checkpoint: Option<ReplayCheckpoint>,
47}
48
49#[derive(Debug, Clone, Copy, Serialize, Deserialize, Default, PartialEq, Eq, PartialOrd, Ord)]
50pub struct ReplayCheckpoint {
51    #[serde(default)]
52    pub entry_id: u64,
53    #[serde(default, skip_serializing_if = "Option::is_none")]
54    pub metadata_entry_id: Option<u64>,
55}
56
57impl<'a> TopicRegionKey<'a> {
58    pub fn new(region_id: RegionId, topic: &'a str) -> Self {
59        Self { region_id, topic }
60    }
61
62    pub fn range_topic_key(topic: &str) -> String {
63        format!("{}/{}/", TOPIC_REGION_PREFIX, topic)
64    }
65}
66
67impl<'a> MetadataKey<'a, TopicRegionKey<'a>> for TopicRegionKey<'a> {
68    fn to_bytes(&self) -> Vec<u8> {
69        self.to_string().into_bytes()
70    }
71
72    fn from_bytes(bytes: &'a [u8]) -> Result<TopicRegionKey<'a>> {
73        let key = std::str::from_utf8(bytes).map_err(|e| {
74            InvalidMetadataSnafu {
75                err_msg: format!(
76                    "TopicRegionKey '{}' is not a valid UTF8 string: {e}",
77                    String::from_utf8_lossy(bytes)
78                ),
79            }
80            .build()
81        })?;
82        TopicRegionKey::try_from(key)
83    }
84}
85
86impl Display for TopicRegionKey<'_> {
87    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
88        write!(
89            f,
90            "{}{}",
91            Self::range_topic_key(self.topic),
92            self.region_id.as_u64()
93        )
94    }
95}
96
97impl<'a> TryFrom<&'a str> for TopicRegionKey<'a> {
98    type Error = Error;
99
100    /// Value is of format `{prefix}/{topic}/{region_id}`
101    fn try_from(value: &'a str) -> Result<TopicRegionKey<'a>> {
102        let captures = TOPIC_REGION_PATTERN
103            .captures(value)
104            .context(InvalidMetadataSnafu {
105                err_msg: format!("Invalid TopicRegionKey: {}", value),
106            })?;
107        let topic = captures.get(1).map(|m| m.as_str()).unwrap();
108        let region_id = captures[2].parse::<u64>().map_err(|_| {
109            InvalidMetadataSnafu {
110                err_msg: format!("Invalid region id in TopicRegionKey: {}", value),
111            }
112            .build()
113        })?;
114        Ok(TopicRegionKey {
115            region_id: RegionId::from_u64(region_id),
116            topic,
117        })
118    }
119}
120
121impl ReplayCheckpoint {
122    /// Creates a new [`ReplayCheckpoint`] with the given entry id and metadata entry id.
123    pub fn new(entry_id: u64, metadata_entry_id: Option<u64>) -> Self {
124        Self {
125            entry_id,
126            metadata_entry_id,
127        }
128    }
129}
130
131impl TopicRegionValue {
132    /// Creates a new [`TopicRegionValue`] with the given checkpoint.
133    pub fn new(checkpoint: Option<ReplayCheckpoint>) -> Self {
134        Self { checkpoint }
135    }
136
137    /// Returns the minimum entry id of the region.
138    ///
139    /// If the metadata entry id is not set, it returns the entry id.
140    pub fn min_entry_id(&self) -> Option<u64> {
141        match self.checkpoint {
142            Some(ReplayCheckpoint {
143                entry_id,
144                metadata_entry_id,
145            }) => match metadata_entry_id {
146                Some(metadata_entry_id) => Some(entry_id.min(metadata_entry_id)),
147                None => Some(entry_id),
148            },
149            None => None,
150        }
151    }
152}
153
154fn topic_region_decoder(value: &KeyValue) -> Result<(TopicRegionKey<'_>, TopicRegionValue)> {
155    let key = TopicRegionKey::from_bytes(&value.key)?;
156    let value = if value.value.is_empty() {
157        TopicRegionValue::default()
158    } else {
159        TopicRegionValue::try_from_raw_value(&value.value)?
160    };
161    Ok((key, value))
162}
163
164/// Manages map of topics and regions in kvbackend.
165pub struct TopicRegionManager {
166    kv_backend: KvBackendRef,
167}
168
169impl TopicRegionManager {
170    pub fn new(kv_backend: KvBackendRef) -> Self {
171        Self { kv_backend }
172    }
173
174    pub async fn put(&self, key: TopicRegionKey<'_>) -> Result<()> {
175        let put_req = PutRequest {
176            key: key.to_bytes(),
177            value: vec![],
178            prev_kv: false,
179        };
180        self.kv_backend.put(put_req).await?;
181        Ok(())
182    }
183
184    pub async fn batch_get(
185        &self,
186        keys: Vec<TopicRegionKey<'_>>,
187    ) -> Result<HashMap<RegionId, TopicRegionValue>> {
188        let raw_keys = keys.iter().map(|key| key.to_bytes()).collect::<Vec<_>>();
189        let req = BatchGetRequest { keys: raw_keys };
190        let resp = self.kv_backend.batch_get(req).await?;
191
192        let v = resp
193            .kvs
194            .into_iter()
195            .map(|kv| topic_region_decoder(&kv).map(|(key, value)| (key.region_id, value)))
196            .collect::<Result<HashMap<_, _>>>()?;
197
198        Ok(v)
199    }
200
201    pub async fn get(&self, key: TopicRegionKey<'_>) -> Result<Option<TopicRegionValue>> {
202        let key_bytes = key.to_bytes();
203        let resp = self.kv_backend.get(&key_bytes).await?;
204        let value = resp
205            .map(|kv| topic_region_decoder(&kv).map(|(_, value)| value))
206            .transpose()?;
207
208        Ok(value)
209    }
210
211    pub async fn batch_put(
212        &self,
213        keys: &[(TopicRegionKey<'_>, Option<TopicRegionValue>)],
214    ) -> Result<()> {
215        let req = BatchPutRequest {
216            kvs: keys
217                .iter()
218                .map(|(key, value)| {
219                    let value = value
220                        .map(|v| v.try_as_raw_value())
221                        .transpose()?
222                        .unwrap_or_default();
223
224                    Ok(KeyValue {
225                        key: key.to_bytes(),
226                        value,
227                    })
228                })
229                .collect::<Result<Vec<_>>>()?,
230            prev_kv: false,
231        };
232        self.kv_backend.batch_put(req).await?;
233        Ok(())
234    }
235
236    /// Build a create topic region mapping transaction. It only executes while the primary keys comparing successes.
237    pub fn build_create_txn(
238        &self,
239        table_id: TableId,
240        region_wal_options: &HashMap<RegionNumber, String>,
241    ) -> Result<Txn> {
242        let region_wal_options = parse_region_wal_options(region_wal_options)?;
243        let topic_region_mapping = self.get_topic_region_mapping(table_id, &region_wal_options);
244        let topic_region_keys = topic_region_mapping
245            .iter()
246            .map(|(region_id, topic)| TopicRegionKey::new(*region_id, topic))
247            .collect::<Vec<_>>();
248        let operations = topic_region_keys
249            .into_iter()
250            .map(|key| TxnOp::Put(key.to_bytes(), vec![]))
251            .collect::<Vec<_>>();
252        Ok(Txn::new().and_then(operations))
253    }
254
255    /// Build a update topic region mapping transaction.
256    pub fn build_update_txn(
257        &self,
258        table_id: TableId,
259        old_region_wal_options: &HashMap<RegionNumber, String>,
260        new_region_wal_options: &HashMap<RegionNumber, String>,
261    ) -> Result<Txn> {
262        let old_wal_options_parsed = parse_region_wal_options(old_region_wal_options)?;
263        let new_wal_options_parsed = parse_region_wal_options(new_region_wal_options)?;
264        let old_mapping = self.get_topic_region_mapping(table_id, &old_wal_options_parsed);
265        let new_mapping = self.get_topic_region_mapping(table_id, &new_wal_options_parsed);
266
267        // Convert to HashMap for easier lookup: RegionId -> Topic
268        let old_map: HashMap<RegionId, &str> = old_mapping.into_iter().collect();
269        let new_map: HashMap<RegionId, &str> = new_mapping.into_iter().collect();
270        let mut ops = Vec::new();
271
272        // Check for deletes (in old but not in new, or topic changed)
273        for (region_id, old_topic) in &old_map {
274            match new_map.get(region_id) {
275                Some(new_topic) if *new_topic == *old_topic => {
276                    // Same topic, do nothing (preserve checkpoint)
277                }
278                _ => {
279                    // Removed or topic changed -> Delete old
280                    let key = TopicRegionKey::new(*region_id, old_topic);
281                    ops.push(TxnOp::Delete(key.to_bytes()));
282                }
283            }
284        }
285
286        // Check for adds (in new but not in old, or topic changed)
287        for (region_id, new_topic) in &new_map {
288            match old_map.get(region_id) {
289                Some(old_topic) if *old_topic == *new_topic => {
290                    // Same topic, already handled (do nothing)
291                }
292                _ => {
293                    // New or topic changed -> Put new
294                    let key = TopicRegionKey::new(*region_id, new_topic);
295                    // Initialize with empty value (default TopicRegionValue)
296                    ops.push(TxnOp::Put(key.to_bytes(), vec![]));
297                }
298            }
299        }
300
301        Ok(Txn::new().and_then(ops))
302    }
303
304    /// Returns the map of [`RegionId`] to their corresponding topic [`TopicRegionValue`].
305    pub async fn regions(&self, topic: &str) -> Result<HashMap<RegionId, TopicRegionValue>> {
306        let prefix = TopicRegionKey::range_topic_key(topic);
307        let req = RangeRequest::new().with_prefix(prefix.as_bytes());
308        let resp = self.kv_backend.range(req).await?;
309        let region_ids = resp
310            .kvs
311            .iter()
312            .map(topic_region_decoder)
313            .collect::<Result<Vec<_>>>()?;
314        Ok(region_ids
315            .into_iter()
316            .map(|(key, value)| (key.region_id, value))
317            .collect())
318    }
319
320    pub async fn delete(&self, key: TopicRegionKey<'_>) -> Result<()> {
321        let raw_key = key.to_bytes();
322        self.kv_backend.delete(&raw_key, false).await?;
323        Ok(())
324    }
325
326    pub async fn batch_delete(&self, keys: Vec<TopicRegionKey<'_>>) -> Result<()> {
327        let raw_keys = keys.iter().map(|key| key.to_bytes()).collect::<Vec<_>>();
328        let req = BatchDeleteRequest {
329            keys: raw_keys,
330            prev_kv: false,
331        };
332        self.kv_backend.batch_delete(req).await?;
333        Ok(())
334    }
335
336    /// Retrieves a mapping of [`RegionId`]s to their corresponding topics name
337    /// based on the provided table ID and WAL options.
338    ///
339    /// # Returns
340    /// A vector of tuples, where each tuple contains a [`RegionId`] and its corresponding topic name.
341    pub fn get_topic_region_mapping<'a>(
342        &self,
343        table_id: TableId,
344        region_wal_options: &'a HashMap<RegionNumber, WalOptions>,
345    ) -> Vec<(RegionId, &'a str)> {
346        region_wal_options
347            .keys()
348            .filter_map(
349                |region_number| match region_wal_options.get(region_number) {
350                    Some(WalOptions::Kafka(kafka)) => {
351                        let region_id = RegionId::new(table_id, *region_number);
352                        Some((region_id, kafka.topic.as_str()))
353                    }
354                    Some(WalOptions::RaftEngine) => None,
355                    Some(WalOptions::Noop) => None,
356                    None => None,
357                },
358            )
359            .collect::<Vec<_>>()
360    }
361}
362
363#[cfg(test)]
364mod tests {
365    use std::sync::Arc;
366
367    use common_wal::options::KafkaWalOptions;
368
369    use super::*;
370    use crate::kv_backend::memory::MemoryKvBackend;
371
372    #[tokio::test]
373    async fn test_topic_region_manager() {
374        let kv_backend = Arc::new(MemoryKvBackend::default());
375        let manager = TopicRegionManager::new(kv_backend.clone());
376
377        let topics = (0..16).map(|i| format!("topic_{}", i)).collect::<Vec<_>>();
378        let keys = (0..64)
379            .map(|i| {
380                (
381                    TopicRegionKey::new(RegionId::from_u64(i), &topics[(i % 16) as usize]),
382                    None,
383                )
384            })
385            .collect::<Vec<_>>();
386
387        manager.batch_put(&keys).await.unwrap();
388        let mut key_values = manager
389            .regions(&topics[0])
390            .await
391            .unwrap()
392            .into_keys()
393            .collect::<Vec<_>>();
394        let expected = keys
395            .iter()
396            .filter_map(|(key, _)| {
397                if key.topic == topics[0] {
398                    Some(key.region_id)
399                } else {
400                    None
401                }
402            })
403            .collect::<Vec<_>>();
404        key_values.sort_by_key(|id| id.as_u64());
405        assert_eq!(key_values, expected);
406
407        let key = TopicRegionKey::new(RegionId::from_u64(0), "topic_0");
408        manager.delete(key.clone()).await.unwrap();
409        let mut key_values = manager
410            .regions(&topics[0])
411            .await
412            .unwrap()
413            .into_keys()
414            .collect::<Vec<_>>();
415        let expected = keys
416            .iter()
417            .filter_map(|(key, _)| {
418                if key.topic == topics[0] && key.region_id != RegionId::from_u64(0) {
419                    Some(key.region_id)
420                } else {
421                    None
422                }
423            })
424            .collect::<Vec<_>>();
425        key_values.sort_by_key(|id| id.as_u64());
426        assert_eq!(key_values, expected);
427    }
428
429    #[test]
430    fn test_topic_region_map() {
431        let kv_backend = Arc::new(MemoryKvBackend::default());
432        let manager = TopicRegionManager::new(kv_backend.clone());
433
434        let table_id = 1;
435        let region_wal_options = (0..64)
436            .map(|i| {
437                let region_number = i;
438                let wal_options = if i % 2 == 0 {
439                    WalOptions::Kafka(KafkaWalOptions {
440                        topic: format!("topic_{}", i),
441                    })
442                } else {
443                    WalOptions::RaftEngine
444                };
445                (region_number, serde_json::to_string(&wal_options).unwrap())
446            })
447            .collect::<HashMap<_, _>>();
448
449        let region_wal_options = parse_region_wal_options(&region_wal_options).unwrap();
450        let mut topic_region_mapping =
451            manager.get_topic_region_mapping(table_id, &region_wal_options);
452        let mut expected = (0..64)
453            .filter_map(|i| {
454                if i % 2 == 0 {
455                    Some((RegionId::new(table_id, i), format!("topic_{}", i)))
456                } else {
457                    None
458                }
459            })
460            .collect::<Vec<_>>();
461        topic_region_mapping.sort_by_key(|(region_id, _)| region_id.as_u64());
462        let topic_region_map = topic_region_mapping
463            .iter()
464            .map(|(region_id, topic)| (*region_id, topic.to_string()))
465            .collect::<Vec<_>>();
466        expected.sort_by_key(|(region_id, _)| region_id.as_u64());
467        assert_eq!(topic_region_map, expected);
468    }
469
470    #[test]
471    fn test_topic_region_key_is_match() {
472        let key = "__topic_region/6f153a64-7fac-4cf6-8b0b-a7967dd73879_2/4410931412992";
473        let topic_region_key = TopicRegionKey::try_from(key).unwrap();
474        assert_eq!(
475            topic_region_key.topic,
476            "6f153a64-7fac-4cf6-8b0b-a7967dd73879_2"
477        );
478        assert_eq!(
479            topic_region_key.region_id,
480            RegionId::from_u64(4410931412992)
481        );
482    }
483
484    #[test]
485    fn test_build_create_txn() {
486        let kv_backend = Arc::new(MemoryKvBackend::default());
487        let manager = TopicRegionManager::new(kv_backend.clone());
488        let table_id = 1;
489        let region_wal_options = vec![
490            (
491                0,
492                WalOptions::Kafka(KafkaWalOptions {
493                    topic: "topic_0".to_string(),
494                }),
495            ),
496            (
497                1,
498                WalOptions::Kafka(KafkaWalOptions {
499                    topic: "topic_1".to_string(),
500                }),
501            ),
502            (2, WalOptions::RaftEngine), // Should be ignored
503        ]
504        .into_iter()
505        .map(|(num, opts)| (num, serde_json::to_string(&opts).unwrap()))
506        .collect::<HashMap<_, _>>();
507
508        let txn = manager
509            .build_create_txn(table_id, &region_wal_options)
510            .unwrap();
511
512        // Verify the transaction contains correct operations
513        // Should create mappings for region 0 and 1, but not region 2 (RaftEngine)
514        let ops = txn.req().success.clone();
515        assert_eq!(ops.len(), 2);
516
517        let keys: Vec<_> = ops
518            .iter()
519            .filter_map(|op| {
520                if let TxnOp::Put(key, _) = op {
521                    TopicRegionKey::from_bytes(key).ok()
522                } else {
523                    None
524                }
525            })
526            .collect();
527
528        assert_eq!(keys.len(), 2);
529        let region_ids: Vec<_> = keys.iter().map(|k| k.region_id).collect();
530        assert!(region_ids.contains(&RegionId::new(table_id, 0)));
531        assert!(region_ids.contains(&RegionId::new(table_id, 1)));
532        assert!(!region_ids.contains(&RegionId::new(table_id, 2)));
533
534        // Verify topics are correct
535        for key in keys {
536            match key.region_id.region_number() {
537                0 => assert_eq!(key.topic, "topic_0"),
538                1 => assert_eq!(key.topic, "topic_1"),
539                _ => panic!("Unexpected region number"),
540            }
541        }
542    }
543
544    #[test]
545    fn test_build_update_txn_add_new_region() {
546        let kv_backend = Arc::new(MemoryKvBackend::default());
547        let manager = TopicRegionManager::new(kv_backend.clone());
548        let table_id = 1;
549        let old_region_wal_options = vec![(
550            0,
551            WalOptions::Kafka(KafkaWalOptions {
552                topic: "topic_0".to_string(),
553            }),
554        )]
555        .into_iter()
556        .map(|(num, opts)| (num, serde_json::to_string(&opts).unwrap()))
557        .collect::<HashMap<_, _>>();
558        let new_region_wal_options = vec![
559            (
560                0,
561                WalOptions::Kafka(KafkaWalOptions {
562                    topic: "topic_0".to_string(),
563                }),
564            ),
565            (
566                1,
567                WalOptions::Kafka(KafkaWalOptions {
568                    topic: "topic_1".to_string(),
569                }),
570            ),
571        ]
572        .into_iter()
573        .map(|(num, opts)| (num, serde_json::to_string(&opts).unwrap()))
574        .collect::<HashMap<_, _>>();
575        let txn = manager
576            .build_update_txn(table_id, &old_region_wal_options, &new_region_wal_options)
577            .unwrap();
578        let ops = txn.req().success.clone();
579        // Should only have Put for new region 1 (region 0 unchanged)
580        assert_eq!(ops.len(), 1);
581        if let TxnOp::Put(key, _) = &ops[0] {
582            let topic_key = TopicRegionKey::from_bytes(key).unwrap();
583            assert_eq!(topic_key.region_id, RegionId::new(table_id, 1));
584            assert_eq!(topic_key.topic, "topic_1");
585        } else {
586            panic!("Expected Put operation");
587        }
588    }
589
590    #[test]
591    fn test_build_update_txn_remove_region() {
592        let kv_backend = Arc::new(MemoryKvBackend::default());
593        let manager = TopicRegionManager::new(kv_backend.clone());
594        let table_id = 1;
595        let old_region_wal_options = vec![
596            (
597                0,
598                WalOptions::Kafka(KafkaWalOptions {
599                    topic: "topic_0".to_string(),
600                }),
601            ),
602            (
603                1,
604                WalOptions::Kafka(KafkaWalOptions {
605                    topic: "topic_1".to_string(),
606                }),
607            ),
608        ]
609        .into_iter()
610        .map(|(num, opts)| (num, serde_json::to_string(&opts).unwrap()))
611        .collect::<HashMap<_, _>>();
612        let new_region_wal_options = vec![(
613            0,
614            WalOptions::Kafka(KafkaWalOptions {
615                topic: "topic_0".to_string(),
616            }),
617        )]
618        .into_iter()
619        .map(|(num, opts)| (num, serde_json::to_string(&opts).unwrap()))
620        .collect::<HashMap<_, _>>();
621        let txn = manager
622            .build_update_txn(table_id, &old_region_wal_options, &new_region_wal_options)
623            .unwrap();
624        let ops = txn.req().success.clone();
625        // Should only have Delete for removed region 1 (region 0 unchanged)
626        assert_eq!(ops.len(), 1);
627        match &ops[0] {
628            TxnOp::Delete(key) => {
629                let topic_key = TopicRegionKey::from_bytes(key).unwrap();
630                assert_eq!(topic_key.region_id, RegionId::new(table_id, 1));
631                assert_eq!(topic_key.topic, "topic_1");
632            }
633            TxnOp::Put(_, _) | TxnOp::Get(_) => {
634                panic!("Expected Delete operation");
635            }
636        }
637    }
638
639    #[test]
640    fn test_build_update_txn_change_topic() {
641        let kv_backend = Arc::new(MemoryKvBackend::default());
642        let manager = TopicRegionManager::new(kv_backend.clone());
643        let table_id = 1;
644        let old_region_wal_options = vec![(
645            0,
646            WalOptions::Kafka(KafkaWalOptions {
647                topic: "topic_0".to_string(),
648            }),
649        )]
650        .into_iter()
651        .map(|(num, opts)| (num, serde_json::to_string(&opts).unwrap()))
652        .collect::<HashMap<_, _>>();
653        let new_region_wal_options = vec![(
654            0,
655            WalOptions::Kafka(KafkaWalOptions {
656                topic: "topic_0_new".to_string(),
657            }),
658        )]
659        .into_iter()
660        .map(|(num, opts)| (num, serde_json::to_string(&opts).unwrap()))
661        .collect::<HashMap<_, _>>();
662        let txn = manager
663            .build_update_txn(table_id, &old_region_wal_options, &new_region_wal_options)
664            .unwrap();
665        let ops = txn.req().success.clone();
666        // Should have Delete for old topic and Put for new topic
667        assert_eq!(ops.len(), 2);
668
669        let mut delete_found = false;
670        let mut put_found = false;
671        for op in ops {
672            match op {
673                TxnOp::Delete(key) => {
674                    let topic_key = TopicRegionKey::from_bytes(&key).unwrap();
675                    assert_eq!(topic_key.region_id, RegionId::new(table_id, 0));
676                    assert_eq!(topic_key.topic, "topic_0");
677                    delete_found = true;
678                }
679                TxnOp::Put(key, _) => {
680                    let topic_key = TopicRegionKey::from_bytes(&key).unwrap();
681                    assert_eq!(topic_key.region_id, RegionId::new(table_id, 0));
682                    assert_eq!(topic_key.topic, "topic_0_new");
683                    put_found = true;
684                }
685                TxnOp::Get(_) => {
686                    // Get operations shouldn't appear in this context
687                    panic!("Unexpected Get operation in update transaction");
688                }
689            }
690        }
691        assert!(delete_found, "Expected Delete operation for old topic");
692        assert!(put_found, "Expected Put operation for new topic");
693    }
694
695    #[test]
696    fn test_build_update_txn_no_change() {
697        let kv_backend = Arc::new(MemoryKvBackend::default());
698        let manager = TopicRegionManager::new(kv_backend.clone());
699        let table_id = 1;
700        let region_wal_options = vec![
701            (
702                0,
703                WalOptions::Kafka(KafkaWalOptions {
704                    topic: "topic_0".to_string(),
705                }),
706            ),
707            (
708                1,
709                WalOptions::Kafka(KafkaWalOptions {
710                    topic: "topic_1".to_string(),
711                }),
712            ),
713        ]
714        .into_iter()
715        .map(|(num, opts)| (num, serde_json::to_string(&opts).unwrap()))
716        .collect::<HashMap<_, _>>();
717        let txn = manager
718            .build_update_txn(table_id, &region_wal_options, &region_wal_options)
719            .unwrap();
720        // Should have no operations when nothing changes (preserves checkpoint)
721        let ops = txn.req().success.clone();
722        assert_eq!(ops.len(), 0);
723    }
724
725    #[test]
726    fn test_build_update_txn_mixed_scenarios() {
727        let kv_backend = Arc::new(MemoryKvBackend::default());
728        let manager = TopicRegionManager::new(kv_backend.clone());
729        let table_id = 1;
730        let old_region_wal_options = vec![
731            (
732                0,
733                WalOptions::Kafka(KafkaWalOptions {
734                    topic: "topic_0".to_string(),
735                }),
736            ),
737            (
738                1,
739                WalOptions::Kafka(KafkaWalOptions {
740                    topic: "topic_1".to_string(),
741                }),
742            ),
743            (
744                2,
745                WalOptions::Kafka(KafkaWalOptions {
746                    topic: "topic_2".to_string(),
747                }),
748            ),
749        ]
750        .into_iter()
751        .map(|(num, opts)| (num, serde_json::to_string(&opts).unwrap()))
752        .collect::<HashMap<_, _>>();
753        let new_region_wal_options = vec![
754            (
755                0,
756                WalOptions::Kafka(KafkaWalOptions {
757                    topic: "topic_0".to_string(), // Unchanged
758                }),
759            ),
760            (
761                1,
762                WalOptions::Kafka(KafkaWalOptions {
763                    topic: "topic_1_new".to_string(), // Topic changed
764                }),
765            ),
766            // Region 2 removed
767            (
768                3,
769                WalOptions::Kafka(KafkaWalOptions {
770                    topic: "topic_3".to_string(), // New region
771                }),
772            ),
773        ]
774        .into_iter()
775        .map(|(num, opts)| (num, serde_json::to_string(&opts).unwrap()))
776        .collect::<HashMap<_, _>>();
777        let txn = manager
778            .build_update_txn(table_id, &old_region_wal_options, &new_region_wal_options)
779            .unwrap();
780
781        let ops = txn.req().success.clone();
782        // Should have:
783        // - Delete for region 2 (removed)
784        // - Delete for region 1 old topic (topic changed)
785        // - Put for region 1 new topic (topic changed)
786        // - Put for region 3 (new)
787        // Region 0 unchanged, so no operation
788        assert_eq!(ops.len(), 4);
789
790        let mut delete_ops = 0;
791        let mut put_ops = 0;
792        let mut delete_region_2 = false;
793        let mut delete_region_1_old = false;
794        let mut put_region_1_new = false;
795        let mut put_region_3 = false;
796
797        for op in ops {
798            match op {
799                TxnOp::Delete(key) => {
800                    delete_ops += 1;
801                    let topic_key = TopicRegionKey::from_bytes(&key).unwrap();
802                    match topic_key.region_id.region_number() {
803                        1 => {
804                            assert_eq!(topic_key.topic, "topic_1");
805                            delete_region_1_old = true;
806                        }
807                        2 => {
808                            assert_eq!(topic_key.topic, "topic_2");
809                            delete_region_2 = true;
810                        }
811                        _ => panic!("Unexpected delete operation for region"),
812                    }
813                }
814                TxnOp::Put(key, _) => {
815                    put_ops += 1;
816                    let topic_key: TopicRegionKey<'_> = TopicRegionKey::from_bytes(&key).unwrap();
817                    match topic_key.region_id.region_number() {
818                        1 => {
819                            assert_eq!(topic_key.topic, "topic_1_new");
820                            put_region_1_new = true;
821                        }
822                        3 => {
823                            assert_eq!(topic_key.topic, "topic_3");
824                            put_region_3 = true;
825                        }
826                        _ => panic!("Unexpected put operation for region"),
827                    }
828                }
829                TxnOp::Get(_) => {
830                    panic!("Unexpected Get operation in update transaction");
831                }
832            }
833        }
834
835        assert_eq!(delete_ops, 2);
836        assert_eq!(put_ops, 2);
837        assert!(delete_region_2, "Expected delete for removed region 2");
838        assert!(
839            delete_region_1_old,
840            "Expected delete for region 1 old topic"
841        );
842        assert!(put_region_1_new, "Expected put for region 1 new topic");
843        assert!(put_region_3, "Expected put for new region 3");
844    }
845
846    #[test]
847    fn test_build_update_txn_with_raft_engine() {
848        let kv_backend = Arc::new(MemoryKvBackend::default());
849        let manager = TopicRegionManager::new(kv_backend.clone());
850        let table_id = 1;
851        let old_region_wal_options = vec![
852            (
853                0,
854                WalOptions::Kafka(KafkaWalOptions {
855                    topic: "topic_0".to_string(),
856                }),
857            ),
858            (1, WalOptions::RaftEngine), // Should be ignored
859        ]
860        .into_iter()
861        .map(|(num, opts)| (num, serde_json::to_string(&opts).unwrap()))
862        .collect::<HashMap<_, _>>();
863        let new_region_wal_options = vec![
864            (
865                0,
866                WalOptions::Kafka(KafkaWalOptions {
867                    topic: "topic_0".to_string(),
868                }),
869            ),
870            (
871                1,
872                WalOptions::Kafka(KafkaWalOptions {
873                    topic: "topic_1".to_string(), // Changed from RaftEngine to Kafka
874                }),
875            ),
876        ]
877        .into_iter()
878        .map(|(num, opts)| (num, serde_json::to_string(&opts).unwrap()))
879        .collect::<HashMap<_, _>>();
880        let txn = manager
881            .build_update_txn(table_id, &old_region_wal_options, &new_region_wal_options)
882            .unwrap();
883        let ops = txn.req().success.clone();
884        // Should only have Put for region 1 (new Kafka topic)
885        // Region 0 unchanged, so no operation
886        // Region 1 was RaftEngine before (not tracked), so only Put needed
887        assert_eq!(ops.len(), 1);
888        match &ops[0] {
889            TxnOp::Put(key, _) => {
890                let topic_key = TopicRegionKey::from_bytes(key).unwrap();
891                assert_eq!(topic_key.region_id, RegionId::new(table_id, 1));
892                assert_eq!(topic_key.topic, "topic_1");
893            }
894            TxnOp::Delete(_) | TxnOp::Get(_) => {
895                panic!("Expected Put operation for new Kafka region");
896            }
897        }
898    }
899}