Skip to main content

common_meta/key/
topic_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::fmt::{self, Display};
17
18use common_wal::options::WalOptions;
19use serde::{Deserialize, Serialize};
20use snafu::OptionExt;
21use store_api::storage::RegionId;
22use table::metadata::TableId;
23
24use crate::error::{Error, InvalidMetadataSnafu, Result};
25use crate::key::{MetadataKey, MetadataValue, TOPIC_REGION_PATTERN, TOPIC_REGION_PREFIX};
26use crate::kv_backend::KvBackendRef;
27use crate::kv_backend::txn::{Txn, TxnOp};
28use crate::rpc::KeyValue;
29use crate::rpc::store::{
30    BatchDeleteRequest, BatchGetRequest, BatchPutRequest, PutRequest, RangeRequest,
31};
32use crate::wal_provider::RegionWalOptions;
33
34// The TopicRegionKey is a key for the topic-region mapping in the kvbackend.
35// The layout of the key is `__topic_region/{topic_name}/{region_id}`.
36#[derive(Debug, Clone, PartialEq, Eq, Hash)]
37pub struct TopicRegionKey<'a> {
38    pub region_id: RegionId,
39    pub topic: &'a str,
40}
41
42/// Represents additional information for a region when using a shared WAL.
43#[derive(Debug, Clone, Copy, Serialize, Deserialize, Default)]
44pub struct TopicRegionValue {
45    #[serde(default, skip_serializing_if = "Option::is_none")]
46    pub checkpoint: Option<ReplayCheckpoint>,
47}
48
49#[derive(Debug, Clone, Copy, Serialize, Deserialize, Default, PartialEq, Eq, PartialOrd, Ord)]
50pub struct ReplayCheckpoint {
51    #[serde(default)]
52    pub entry_id: u64,
53    #[serde(default, skip_serializing_if = "Option::is_none")]
54    pub metadata_entry_id: Option<u64>,
55}
56
57impl<'a> TopicRegionKey<'a> {
58    pub fn new(region_id: RegionId, topic: &'a str) -> Self {
59        Self { region_id, topic }
60    }
61
62    pub fn range_topic_key(topic: &str) -> String {
63        format!("{}/{}/", TOPIC_REGION_PREFIX, topic)
64    }
65}
66
67impl<'a> MetadataKey<'a, TopicRegionKey<'a>> for TopicRegionKey<'a> {
68    fn to_bytes(&self) -> Vec<u8> {
69        self.to_string().into_bytes()
70    }
71
72    fn from_bytes(bytes: &'a [u8]) -> Result<TopicRegionKey<'a>> {
73        let key = std::str::from_utf8(bytes).map_err(|e| {
74            InvalidMetadataSnafu {
75                err_msg: format!(
76                    "TopicRegionKey '{}' is not a valid UTF8 string: {e}",
77                    String::from_utf8_lossy(bytes)
78                ),
79            }
80            .build()
81        })?;
82        TopicRegionKey::try_from(key)
83    }
84}
85
86impl Display for TopicRegionKey<'_> {
87    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
88        write!(
89            f,
90            "{}{}",
91            Self::range_topic_key(self.topic),
92            self.region_id.as_u64()
93        )
94    }
95}
96
97impl<'a> TryFrom<&'a str> for TopicRegionKey<'a> {
98    type Error = Error;
99
100    /// Value is of format `{prefix}/{topic}/{region_id}`
101    fn try_from(value: &'a str) -> Result<TopicRegionKey<'a>> {
102        let captures = TOPIC_REGION_PATTERN
103            .captures(value)
104            .context(InvalidMetadataSnafu {
105                err_msg: format!("Invalid TopicRegionKey: {}", value),
106            })?;
107        let topic = captures.get(1).map(|m| m.as_str()).unwrap();
108        let region_id = captures[2].parse::<u64>().map_err(|_| {
109            InvalidMetadataSnafu {
110                err_msg: format!("Invalid region id in TopicRegionKey: {}", value),
111            }
112            .build()
113        })?;
114        Ok(TopicRegionKey {
115            region_id: RegionId::from_u64(region_id),
116            topic,
117        })
118    }
119}
120
121impl ReplayCheckpoint {
122    /// Creates a new [`ReplayCheckpoint`] with the given entry id and metadata entry id.
123    pub fn new(entry_id: u64, metadata_entry_id: Option<u64>) -> Self {
124        Self {
125            entry_id,
126            metadata_entry_id,
127        }
128    }
129
130    /// Merges the checkpoint with the topic pruned entry id.
131    pub fn merge_with_topic_pruned_entry_id(
132        checkpoint: Option<Self>,
133        pruned_entry_id: Option<u64>,
134        is_metric_engine: bool,
135    ) -> Option<Self> {
136        match (checkpoint, pruned_entry_id) {
137            (Some(checkpoint), Some(pruned_entry_id)) => Some(Self {
138                entry_id: checkpoint.entry_id.max(pruned_entry_id),
139                metadata_entry_id: if is_metric_engine {
140                    Some(
141                        checkpoint
142                            .metadata_entry_id
143                            .unwrap_or_default()
144                            .max(pruned_entry_id),
145                    )
146                } else {
147                    checkpoint.metadata_entry_id
148                },
149            }),
150            (None, Some(pruned_entry_id)) => Some(Self {
151                entry_id: pruned_entry_id,
152                metadata_entry_id: is_metric_engine.then_some(pruned_entry_id),
153            }),
154            (checkpoint, None) => checkpoint,
155        }
156    }
157}
158
159impl TopicRegionValue {
160    /// Creates a new [`TopicRegionValue`] with the given checkpoint.
161    pub fn new(checkpoint: Option<ReplayCheckpoint>) -> Self {
162        Self { checkpoint }
163    }
164
165    /// Returns the minimum entry id of the region.
166    ///
167    /// If the metadata entry id is not set, it returns the entry id.
168    pub fn min_entry_id(&self) -> Option<u64> {
169        match self.checkpoint {
170            Some(ReplayCheckpoint {
171                entry_id,
172                metadata_entry_id,
173            }) => match metadata_entry_id {
174                Some(metadata_entry_id) => Some(entry_id.min(metadata_entry_id)),
175                None => Some(entry_id),
176            },
177            None => None,
178        }
179    }
180}
181
182fn topic_region_decoder(value: &KeyValue) -> Result<(TopicRegionKey<'_>, TopicRegionValue)> {
183    let key = TopicRegionKey::from_bytes(&value.key)?;
184    let value = if value.value.is_empty() {
185        TopicRegionValue::default()
186    } else {
187        TopicRegionValue::try_from_raw_value(&value.value)?
188    };
189    Ok((key, value))
190}
191
192/// Manages map of topics and regions in kvbackend.
193pub struct TopicRegionManager {
194    kv_backend: KvBackendRef,
195}
196
197impl TopicRegionManager {
198    pub fn new(kv_backend: KvBackendRef) -> Self {
199        Self { kv_backend }
200    }
201
202    pub async fn put(&self, key: TopicRegionKey<'_>) -> Result<()> {
203        let put_req = PutRequest {
204            key: key.to_bytes(),
205            value: vec![],
206            prev_kv: false,
207        };
208        self.kv_backend.put(put_req).await?;
209        Ok(())
210    }
211
212    pub async fn batch_get(
213        &self,
214        keys: Vec<TopicRegionKey<'_>>,
215    ) -> Result<HashMap<RegionId, TopicRegionValue>> {
216        let raw_keys = keys.iter().map(|key| key.to_bytes()).collect::<Vec<_>>();
217        let req = BatchGetRequest { keys: raw_keys };
218        let resp = self.kv_backend.batch_get(req).await?;
219
220        let v = resp
221            .kvs
222            .into_iter()
223            .map(|kv| topic_region_decoder(&kv).map(|(key, value)| (key.region_id, value)))
224            .collect::<Result<HashMap<_, _>>>()?;
225
226        Ok(v)
227    }
228
229    pub async fn get(&self, key: TopicRegionKey<'_>) -> Result<Option<TopicRegionValue>> {
230        let key_bytes = key.to_bytes();
231        let resp = self.kv_backend.get(&key_bytes).await?;
232        let value = resp
233            .map(|kv| topic_region_decoder(&kv).map(|(_, value)| value))
234            .transpose()?;
235
236        Ok(value)
237    }
238
239    pub async fn batch_put(
240        &self,
241        keys: &[(TopicRegionKey<'_>, Option<TopicRegionValue>)],
242    ) -> Result<()> {
243        let req = BatchPutRequest {
244            kvs: keys
245                .iter()
246                .map(|(key, value)| {
247                    let value = value
248                        .map(|v| v.try_as_raw_value())
249                        .transpose()?
250                        .unwrap_or_default();
251
252                    Ok(KeyValue {
253                        key: key.to_bytes(),
254                        value,
255                    })
256                })
257                .collect::<Result<Vec<_>>>()?,
258            prev_kv: false,
259        };
260        self.kv_backend.batch_put(req).await?;
261        Ok(())
262    }
263
264    /// Build a create topic region mapping transaction. It only executes while the primary keys comparing successes.
265    pub fn build_create_txn(
266        &self,
267        table_id: TableId,
268        region_wal_options: &RegionWalOptions,
269    ) -> Result<Txn> {
270        let topic_region_mapping = self.get_topic_region_mapping(table_id, region_wal_options);
271        let topic_region_keys = topic_region_mapping
272            .iter()
273            .map(|(region_id, topic)| TopicRegionKey::new(*region_id, topic))
274            .collect::<Vec<_>>();
275        let operations = topic_region_keys
276            .into_iter()
277            .map(|key| TxnOp::Put(key.to_bytes(), vec![]))
278            .collect::<Vec<_>>();
279        Ok(Txn::new().and_then(operations))
280    }
281
282    /// Build a update topic region mapping transaction.
283    pub fn build_update_txn(
284        &self,
285        table_id: TableId,
286        old_region_wal_options: &RegionWalOptions,
287        new_region_wal_options: &RegionWalOptions,
288    ) -> Result<Txn> {
289        let old_mapping = self.get_topic_region_mapping(table_id, old_region_wal_options);
290        let new_mapping = self.get_topic_region_mapping(table_id, new_region_wal_options);
291
292        // Convert to HashMap for easier lookup: RegionId -> Topic
293        let old_map: HashMap<RegionId, &str> = old_mapping.into_iter().collect();
294        let new_map: HashMap<RegionId, &str> = new_mapping.into_iter().collect();
295        let mut ops = Vec::new();
296
297        // Check for deletes (in old but not in new, or topic changed)
298        for (region_id, old_topic) in &old_map {
299            match new_map.get(region_id) {
300                Some(new_topic) if *new_topic == *old_topic => {
301                    // Same topic, do nothing (preserve checkpoint)
302                }
303                _ => {
304                    // Removed or topic changed -> Delete old
305                    let key = TopicRegionKey::new(*region_id, old_topic);
306                    ops.push(TxnOp::Delete(key.to_bytes()));
307                }
308            }
309        }
310
311        // Check for adds (in new but not in old, or topic changed)
312        for (region_id, new_topic) in &new_map {
313            match old_map.get(region_id) {
314                Some(old_topic) if *old_topic == *new_topic => {
315                    // Same topic, already handled (do nothing)
316                }
317                _ => {
318                    // New or topic changed -> Put new
319                    let key = TopicRegionKey::new(*region_id, new_topic);
320                    // Initialize with empty value (default TopicRegionValue)
321                    ops.push(TxnOp::Put(key.to_bytes(), vec![]));
322                }
323            }
324        }
325
326        Ok(Txn::new().and_then(ops))
327    }
328
329    /// Returns the map of [`RegionId`] to their corresponding topic [`TopicRegionValue`].
330    pub async fn regions(&self, topic: &str) -> Result<HashMap<RegionId, TopicRegionValue>> {
331        let prefix = TopicRegionKey::range_topic_key(topic);
332        let req = RangeRequest::new().with_prefix(prefix.as_bytes());
333        let resp = self.kv_backend.range(req).await?;
334        let region_ids = resp
335            .kvs
336            .iter()
337            .map(topic_region_decoder)
338            .collect::<Result<Vec<_>>>()?;
339        Ok(region_ids
340            .into_iter()
341            .map(|(key, value)| (key.region_id, value))
342            .collect())
343    }
344
345    pub async fn delete(&self, key: TopicRegionKey<'_>) -> Result<()> {
346        let raw_key = key.to_bytes();
347        self.kv_backend.delete(&raw_key, false).await?;
348        Ok(())
349    }
350
351    pub async fn batch_delete(&self, keys: Vec<TopicRegionKey<'_>>) -> Result<()> {
352        let raw_keys = keys.iter().map(|key| key.to_bytes()).collect::<Vec<_>>();
353        let req = BatchDeleteRequest {
354            keys: raw_keys,
355            prev_kv: false,
356        };
357        self.kv_backend.batch_delete(req).await?;
358        Ok(())
359    }
360
361    /// Retrieves a mapping of [`RegionId`]s to their corresponding topics name
362    /// based on the provided table ID and WAL options.
363    ///
364    /// # Returns
365    /// A vector of tuples, where each tuple contains a [`RegionId`] and its corresponding topic name.
366    pub fn get_topic_region_mapping<'a>(
367        &self,
368        table_id: TableId,
369        region_wal_options: &'a RegionWalOptions,
370    ) -> Vec<(RegionId, &'a str)> {
371        region_wal_options
372            .keys()
373            .filter_map(
374                |region_number| match region_wal_options.get(region_number) {
375                    Some(WalOptions::Kafka(kafka)) => {
376                        let region_id = RegionId::new(table_id, *region_number);
377                        Some((region_id, kafka.topic.as_str()))
378                    }
379                    Some(WalOptions::RaftEngine) => None,
380                    Some(WalOptions::Noop) => None,
381                    None => None,
382                },
383            )
384            .collect::<Vec<_>>()
385    }
386}
387
388#[cfg(test)]
389mod tests {
390    use std::sync::Arc;
391
392    use common_wal::options::KafkaWalOptions;
393
394    use super::*;
395    use crate::kv_backend::memory::MemoryKvBackend;
396
397    #[test]
398    fn test_merge_checkpoint_with_topic_pruned_entry_id_missing_pruned() {
399        let checkpoint = Some(ReplayCheckpoint::new(10, None));
400
401        assert_eq!(
402            ReplayCheckpoint::merge_with_topic_pruned_entry_id(checkpoint, None, true),
403            checkpoint
404        );
405    }
406
407    #[test]
408    fn test_merge_checkpoint_with_topic_pruned_entry_id_creates_checkpoint() {
409        assert_eq!(
410            ReplayCheckpoint::merge_with_topic_pruned_entry_id(None, Some(10), true),
411            Some(ReplayCheckpoint::new(10, Some(10)))
412        );
413    }
414
415    #[test]
416    fn test_merge_checkpoint_with_topic_pruned_entry_id_updates_both_ids() {
417        let checkpoint = ReplayCheckpoint::new(10, Some(5));
418
419        assert_eq!(
420            ReplayCheckpoint::merge_with_topic_pruned_entry_id(Some(checkpoint), Some(20), true),
421            Some(ReplayCheckpoint::new(20, Some(20)))
422        );
423    }
424
425    #[test]
426    fn test_merge_checkpoint_with_topic_pruned_entry_id_preserves_larger_ids() {
427        let checkpoint = ReplayCheckpoint::new(30, Some(40));
428
429        assert_eq!(
430            ReplayCheckpoint::merge_with_topic_pruned_entry_id(Some(checkpoint), Some(20), true),
431            Some(checkpoint)
432        );
433    }
434
435    #[test]
436    fn test_merge_checkpoint_with_topic_pruned_entry_id_for_mito() {
437        assert_eq!(
438            ReplayCheckpoint::merge_with_topic_pruned_entry_id(None, Some(10), false),
439            Some(ReplayCheckpoint::new(10, None))
440        );
441
442        let checkpoint = ReplayCheckpoint::new(5, Some(8));
443        assert_eq!(
444            ReplayCheckpoint::merge_with_topic_pruned_entry_id(Some(checkpoint), Some(10), false),
445            Some(ReplayCheckpoint::new(10, Some(8)))
446        );
447    }
448
449    #[tokio::test]
450    async fn test_topic_region_manager() {
451        let kv_backend = Arc::new(MemoryKvBackend::default());
452        let manager = TopicRegionManager::new(kv_backend.clone());
453
454        let topics = (0..16).map(|i| format!("topic_{}", i)).collect::<Vec<_>>();
455        let keys = (0..64)
456            .map(|i| {
457                (
458                    TopicRegionKey::new(RegionId::from_u64(i), &topics[(i % 16) as usize]),
459                    None,
460                )
461            })
462            .collect::<Vec<_>>();
463
464        manager.batch_put(&keys).await.unwrap();
465        let mut key_values = manager
466            .regions(&topics[0])
467            .await
468            .unwrap()
469            .into_keys()
470            .collect::<Vec<_>>();
471        let expected = keys
472            .iter()
473            .filter_map(|(key, _)| {
474                if key.topic == topics[0] {
475                    Some(key.region_id)
476                } else {
477                    None
478                }
479            })
480            .collect::<Vec<_>>();
481        key_values.sort_by_key(|id| id.as_u64());
482        assert_eq!(key_values, expected);
483
484        let key = TopicRegionKey::new(RegionId::from_u64(0), "topic_0");
485        manager.delete(key.clone()).await.unwrap();
486        let mut key_values = manager
487            .regions(&topics[0])
488            .await
489            .unwrap()
490            .into_keys()
491            .collect::<Vec<_>>();
492        let expected = keys
493            .iter()
494            .filter_map(|(key, _)| {
495                if key.topic == topics[0] && key.region_id != RegionId::from_u64(0) {
496                    Some(key.region_id)
497                } else {
498                    None
499                }
500            })
501            .collect::<Vec<_>>();
502        key_values.sort_by_key(|id| id.as_u64());
503        assert_eq!(key_values, expected);
504    }
505
506    #[test]
507    fn test_topic_region_map() {
508        let kv_backend = Arc::new(MemoryKvBackend::default());
509        let manager = TopicRegionManager::new(kv_backend.clone());
510
511        let table_id = 1;
512        let region_wal_options = (0..64)
513            .map(|i| {
514                let region_number = i;
515                let wal_options = if i % 2 == 0 {
516                    WalOptions::Kafka(KafkaWalOptions::new(format!("topic_{}", i)))
517                } else {
518                    WalOptions::RaftEngine
519                };
520                (region_number, wal_options)
521            })
522            .collect::<HashMap<_, _>>();
523
524        let mut topic_region_mapping =
525            manager.get_topic_region_mapping(table_id, &region_wal_options);
526        let mut expected = (0..64)
527            .filter_map(|i| {
528                if i % 2 == 0 {
529                    Some((RegionId::new(table_id, i), format!("topic_{}", i)))
530                } else {
531                    None
532                }
533            })
534            .collect::<Vec<_>>();
535        topic_region_mapping.sort_by_key(|(region_id, _)| region_id.as_u64());
536        let topic_region_map = topic_region_mapping
537            .iter()
538            .map(|(region_id, topic)| (*region_id, topic.to_string()))
539            .collect::<Vec<_>>();
540        expected.sort_by_key(|(region_id, _)| region_id.as_u64());
541        assert_eq!(topic_region_map, expected);
542    }
543
544    #[test]
545    fn test_topic_region_key_is_match() {
546        let key = "__topic_region/6f153a64-7fac-4cf6-8b0b-a7967dd73879_2/4410931412992";
547        let topic_region_key = TopicRegionKey::try_from(key).unwrap();
548        assert_eq!(
549            topic_region_key.topic,
550            "6f153a64-7fac-4cf6-8b0b-a7967dd73879_2"
551        );
552        assert_eq!(
553            topic_region_key.region_id,
554            RegionId::from_u64(4410931412992)
555        );
556    }
557
558    #[test]
559    fn test_build_create_txn() {
560        let kv_backend = Arc::new(MemoryKvBackend::default());
561        let manager = TopicRegionManager::new(kv_backend.clone());
562        let table_id = 1;
563        let region_wal_options = vec![
564            (
565                0,
566                WalOptions::Kafka(KafkaWalOptions::new("topic_0".to_string())),
567            ),
568            (
569                1,
570                WalOptions::Kafka(KafkaWalOptions::new("topic_1".to_string())),
571            ),
572            (2, WalOptions::RaftEngine), // Should be ignored
573        ]
574        .into_iter()
575        .collect::<HashMap<_, _>>();
576
577        let txn = manager
578            .build_create_txn(table_id, &region_wal_options)
579            .unwrap();
580
581        // Verify the transaction contains correct operations
582        // Should create mappings for region 0 and 1, but not region 2 (RaftEngine)
583        let ops = txn.req().success.clone();
584        assert_eq!(ops.len(), 2);
585
586        let keys: Vec<_> = ops
587            .iter()
588            .filter_map(|op| {
589                if let TxnOp::Put(key, _) = op {
590                    TopicRegionKey::from_bytes(key).ok()
591                } else {
592                    None
593                }
594            })
595            .collect();
596
597        assert_eq!(keys.len(), 2);
598        let region_ids: Vec<_> = keys.iter().map(|k| k.region_id).collect();
599        assert!(region_ids.contains(&RegionId::new(table_id, 0)));
600        assert!(region_ids.contains(&RegionId::new(table_id, 1)));
601        assert!(!region_ids.contains(&RegionId::new(table_id, 2)));
602
603        // Verify topics are correct
604        for key in keys {
605            match key.region_id.region_number() {
606                0 => assert_eq!(key.topic, "topic_0"),
607                1 => assert_eq!(key.topic, "topic_1"),
608                _ => panic!("Unexpected region number"),
609            }
610        }
611    }
612
613    #[test]
614    fn test_build_update_txn_add_new_region() {
615        let kv_backend = Arc::new(MemoryKvBackend::default());
616        let manager = TopicRegionManager::new(kv_backend.clone());
617        let table_id = 1;
618        let old_region_wal_options = vec![(
619            0,
620            WalOptions::Kafka(KafkaWalOptions::new("topic_0".to_string())),
621        )]
622        .into_iter()
623        .collect::<HashMap<_, _>>();
624        let new_region_wal_options = vec![
625            (
626                0,
627                WalOptions::Kafka(KafkaWalOptions::new("topic_0".to_string())),
628            ),
629            (
630                1,
631                WalOptions::Kafka(KafkaWalOptions::new("topic_1".to_string())),
632            ),
633        ]
634        .into_iter()
635        .collect::<HashMap<_, _>>();
636        let txn = manager
637            .build_update_txn(table_id, &old_region_wal_options, &new_region_wal_options)
638            .unwrap();
639        let ops = txn.req().success.clone();
640        // Should only have Put for new region 1 (region 0 unchanged)
641        assert_eq!(ops.len(), 1);
642        if let TxnOp::Put(key, _) = &ops[0] {
643            let topic_key = TopicRegionKey::from_bytes(key).unwrap();
644            assert_eq!(topic_key.region_id, RegionId::new(table_id, 1));
645            assert_eq!(topic_key.topic, "topic_1");
646        } else {
647            panic!("Expected Put operation");
648        }
649    }
650
651    #[test]
652    fn test_build_update_txn_remove_region() {
653        let kv_backend = Arc::new(MemoryKvBackend::default());
654        let manager = TopicRegionManager::new(kv_backend.clone());
655        let table_id = 1;
656        let old_region_wal_options = vec![
657            (
658                0,
659                WalOptions::Kafka(KafkaWalOptions::new("topic_0".to_string())),
660            ),
661            (
662                1,
663                WalOptions::Kafka(KafkaWalOptions::new("topic_1".to_string())),
664            ),
665        ]
666        .into_iter()
667        .collect::<HashMap<_, _>>();
668        let new_region_wal_options = vec![(
669            0,
670            WalOptions::Kafka(KafkaWalOptions::new("topic_0".to_string())),
671        )]
672        .into_iter()
673        .collect::<HashMap<_, _>>();
674        let txn = manager
675            .build_update_txn(table_id, &old_region_wal_options, &new_region_wal_options)
676            .unwrap();
677        let ops = txn.req().success.clone();
678        // Should only have Delete for removed region 1 (region 0 unchanged)
679        assert_eq!(ops.len(), 1);
680        match &ops[0] {
681            TxnOp::Delete(key) => {
682                let topic_key = TopicRegionKey::from_bytes(key).unwrap();
683                assert_eq!(topic_key.region_id, RegionId::new(table_id, 1));
684                assert_eq!(topic_key.topic, "topic_1");
685            }
686            TxnOp::Put(_, _) | TxnOp::Get(_) => {
687                panic!("Expected Delete operation");
688            }
689        }
690    }
691
692    #[test]
693    fn test_build_update_txn_change_topic() {
694        let kv_backend = Arc::new(MemoryKvBackend::default());
695        let manager = TopicRegionManager::new(kv_backend.clone());
696        let table_id = 1;
697        let old_region_wal_options = vec![(
698            0,
699            WalOptions::Kafka(KafkaWalOptions::new("topic_0".to_string())),
700        )]
701        .into_iter()
702        .collect::<HashMap<_, _>>();
703        let new_region_wal_options = vec![(
704            0,
705            WalOptions::Kafka(KafkaWalOptions::new("topic_0_new".to_string())),
706        )]
707        .into_iter()
708        .collect::<HashMap<_, _>>();
709        let txn = manager
710            .build_update_txn(table_id, &old_region_wal_options, &new_region_wal_options)
711            .unwrap();
712        let ops = txn.req().success.clone();
713        // Should have Delete for old topic and Put for new topic
714        assert_eq!(ops.len(), 2);
715
716        let mut delete_found = false;
717        let mut put_found = false;
718        for op in ops {
719            match op {
720                TxnOp::Delete(key) => {
721                    let topic_key = TopicRegionKey::from_bytes(&key).unwrap();
722                    assert_eq!(topic_key.region_id, RegionId::new(table_id, 0));
723                    assert_eq!(topic_key.topic, "topic_0");
724                    delete_found = true;
725                }
726                TxnOp::Put(key, _) => {
727                    let topic_key = TopicRegionKey::from_bytes(&key).unwrap();
728                    assert_eq!(topic_key.region_id, RegionId::new(table_id, 0));
729                    assert_eq!(topic_key.topic, "topic_0_new");
730                    put_found = true;
731                }
732                TxnOp::Get(_) => {
733                    // Get operations shouldn't appear in this context
734                    panic!("Unexpected Get operation in update transaction");
735                }
736            }
737        }
738        assert!(delete_found, "Expected Delete operation for old topic");
739        assert!(put_found, "Expected Put operation for new topic");
740    }
741
742    #[test]
743    fn test_build_update_txn_no_change() {
744        let kv_backend = Arc::new(MemoryKvBackend::default());
745        let manager = TopicRegionManager::new(kv_backend.clone());
746        let table_id = 1;
747        let region_wal_options = vec![
748            (
749                0,
750                WalOptions::Kafka(KafkaWalOptions::new("topic_0".to_string())),
751            ),
752            (
753                1,
754                WalOptions::Kafka(KafkaWalOptions::new("topic_1".to_string())),
755            ),
756        ]
757        .into_iter()
758        .collect::<HashMap<_, _>>();
759        let txn = manager
760            .build_update_txn(table_id, &region_wal_options, &region_wal_options)
761            .unwrap();
762        // Should have no operations when nothing changes (preserves checkpoint)
763        let ops = txn.req().success.clone();
764        assert_eq!(ops.len(), 0);
765    }
766
767    #[test]
768    fn test_build_update_txn_mixed_scenarios() {
769        let kv_backend = Arc::new(MemoryKvBackend::default());
770        let manager = TopicRegionManager::new(kv_backend.clone());
771        let table_id = 1;
772        let old_region_wal_options = vec![
773            (
774                0,
775                WalOptions::Kafka(KafkaWalOptions::new("topic_0".to_string())),
776            ),
777            (
778                1,
779                WalOptions::Kafka(KafkaWalOptions::new("topic_1".to_string())),
780            ),
781            (
782                2,
783                WalOptions::Kafka(KafkaWalOptions::new("topic_2".to_string())),
784            ),
785        ]
786        .into_iter()
787        .collect::<HashMap<_, _>>();
788        let new_region_wal_options = vec![
789            (
790                0,
791                WalOptions::Kafka(KafkaWalOptions::new("topic_0".to_string())), // Unchanged
792            ),
793            (
794                1,
795                WalOptions::Kafka(KafkaWalOptions::new("topic_1_new".to_string())), // Topic changed
796            ),
797            // Region 2 removed
798            (
799                3,
800                WalOptions::Kafka(KafkaWalOptions::new("topic_3".to_string())), // New region
801            ),
802        ]
803        .into_iter()
804        .collect::<HashMap<_, _>>();
805        let txn = manager
806            .build_update_txn(table_id, &old_region_wal_options, &new_region_wal_options)
807            .unwrap();
808
809        let ops = txn.req().success.clone();
810        // Should have:
811        // - Delete for region 2 (removed)
812        // - Delete for region 1 old topic (topic changed)
813        // - Put for region 1 new topic (topic changed)
814        // - Put for region 3 (new)
815        // Region 0 unchanged, so no operation
816        assert_eq!(ops.len(), 4);
817
818        let mut delete_ops = 0;
819        let mut put_ops = 0;
820        let mut delete_region_2 = false;
821        let mut delete_region_1_old = false;
822        let mut put_region_1_new = false;
823        let mut put_region_3 = false;
824
825        for op in ops {
826            match op {
827                TxnOp::Delete(key) => {
828                    delete_ops += 1;
829                    let topic_key = TopicRegionKey::from_bytes(&key).unwrap();
830                    match topic_key.region_id.region_number() {
831                        1 => {
832                            assert_eq!(topic_key.topic, "topic_1");
833                            delete_region_1_old = true;
834                        }
835                        2 => {
836                            assert_eq!(topic_key.topic, "topic_2");
837                            delete_region_2 = true;
838                        }
839                        _ => panic!("Unexpected delete operation for region"),
840                    }
841                }
842                TxnOp::Put(key, _) => {
843                    put_ops += 1;
844                    let topic_key: TopicRegionKey<'_> = TopicRegionKey::from_bytes(&key).unwrap();
845                    match topic_key.region_id.region_number() {
846                        1 => {
847                            assert_eq!(topic_key.topic, "topic_1_new");
848                            put_region_1_new = true;
849                        }
850                        3 => {
851                            assert_eq!(topic_key.topic, "topic_3");
852                            put_region_3 = true;
853                        }
854                        _ => panic!("Unexpected put operation for region"),
855                    }
856                }
857                TxnOp::Get(_) => {
858                    panic!("Unexpected Get operation in update transaction");
859                }
860            }
861        }
862
863        assert_eq!(delete_ops, 2);
864        assert_eq!(put_ops, 2);
865        assert!(delete_region_2, "Expected delete for removed region 2");
866        assert!(
867            delete_region_1_old,
868            "Expected delete for region 1 old topic"
869        );
870        assert!(put_region_1_new, "Expected put for region 1 new topic");
871        assert!(put_region_3, "Expected put for new region 3");
872    }
873
874    #[test]
875    fn test_build_update_txn_with_raft_engine() {
876        let kv_backend = Arc::new(MemoryKvBackend::default());
877        let manager = TopicRegionManager::new(kv_backend.clone());
878        let table_id = 1;
879        let old_region_wal_options = vec![
880            (
881                0,
882                WalOptions::Kafka(KafkaWalOptions::new("topic_0".to_string())),
883            ),
884            (1, WalOptions::RaftEngine), // Should be ignored
885        ]
886        .into_iter()
887        .collect::<HashMap<_, _>>();
888        let new_region_wal_options = vec![
889            (
890                0,
891                WalOptions::Kafka(KafkaWalOptions::new("topic_0".to_string())),
892            ),
893            (
894                1,
895                WalOptions::Kafka(KafkaWalOptions::new("topic_1".to_string())), // Changed from RaftEngine to Kafka
896            ),
897        ]
898        .into_iter()
899        .collect::<HashMap<_, _>>();
900        let txn = manager
901            .build_update_txn(table_id, &old_region_wal_options, &new_region_wal_options)
902            .unwrap();
903        let ops = txn.req().success.clone();
904        // Should only have Put for region 1 (new Kafka topic)
905        // Region 0 unchanged, so no operation
906        // Region 1 was RaftEngine before (not tracked), so only Put needed
907        assert_eq!(ops.len(), 1);
908        match &ops[0] {
909            TxnOp::Put(key, _) => {
910                let topic_key = TopicRegionKey::from_bytes(key).unwrap();
911                assert_eq!(topic_key.region_id, RegionId::new(table_id, 1));
912                assert_eq!(topic_key.topic, "topic_1");
913            }
914            TxnOp::Delete(_) | TxnOp::Get(_) => {
915                panic!("Expected Put operation for new Kafka region");
916            }
917        }
918    }
919}