common_meta/key/
topic_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// Copyright 2023 Greptime Team
16//
17// Licensed under the Apache License, Version 2.0 (the "License");
18// you may not use this file except in compliance with the License.
19// You may obtain a copy of the License at
20//
21//     http://www.apache.org/licenses/LICENSE-2.0
22//
23// Unless required by applicable law or agreed to in writing, software
24// distributed under the License is distributed on an "AS IS" BASIS,
25// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
26// See the License for the specific language governing permissions and
27// limitations under the License.
28
29use std::collections::HashMap;
30use std::fmt::{self, Display};
31
32use common_wal::options::WalOptions;
33use serde::{Deserialize, Serialize};
34use snafu::OptionExt;
35use store_api::storage::{RegionId, RegionNumber};
36use table::metadata::TableId;
37
38use crate::ddl::utils::parse_region_wal_options;
39use crate::error::{Error, InvalidMetadataSnafu, Result};
40use crate::key::{MetadataKey, TOPIC_REGION_PATTERN, TOPIC_REGION_PREFIX};
41use crate::kv_backend::txn::{Txn, TxnOp};
42use crate::kv_backend::KvBackendRef;
43use crate::rpc::store::{BatchDeleteRequest, BatchPutRequest, PutRequest, RangeRequest};
44use crate::rpc::KeyValue;
45
46// The TopicRegionKey is a key for the topic-region mapping in the kvbackend.
47// The layout of the key is `__topic_region/{topic_name}/{region_id}`.
48#[derive(Debug, Clone, PartialEq)]
49pub struct TopicRegionKey<'a> {
50    pub region_id: RegionId,
51    pub topic: &'a str,
52}
53
54#[derive(Debug, Serialize, Deserialize)]
55pub struct TopicRegionValue;
56
57impl<'a> TopicRegionKey<'a> {
58    pub fn new(region_id: RegionId, topic: &'a str) -> Self {
59        Self { region_id, topic }
60    }
61
62    pub fn range_topic_key(topic: &str) -> String {
63        format!("{}/{}/", TOPIC_REGION_PREFIX, topic)
64    }
65}
66
67impl<'a> MetadataKey<'a, TopicRegionKey<'a>> for TopicRegionKey<'a> {
68    fn to_bytes(&self) -> Vec<u8> {
69        self.to_string().into_bytes()
70    }
71
72    fn from_bytes(bytes: &'a [u8]) -> Result<TopicRegionKey<'a>> {
73        let key = std::str::from_utf8(bytes).map_err(|e| {
74            InvalidMetadataSnafu {
75                err_msg: format!(
76                    "TopicRegionKey '{}' is not a valid UTF8 string: {e}",
77                    String::from_utf8_lossy(bytes)
78                ),
79            }
80            .build()
81        })?;
82        TopicRegionKey::try_from(key)
83    }
84}
85
86impl Display for TopicRegionKey<'_> {
87    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
88        write!(
89            f,
90            "{}{}",
91            Self::range_topic_key(self.topic),
92            self.region_id.as_u64()
93        )
94    }
95}
96
97impl<'a> TryFrom<&'a str> for TopicRegionKey<'a> {
98    type Error = Error;
99
100    /// Value is of format `{prefix}/{topic}/{region_id}`
101    fn try_from(value: &'a str) -> Result<TopicRegionKey<'a>> {
102        let captures = TOPIC_REGION_PATTERN
103            .captures(value)
104            .context(InvalidMetadataSnafu {
105                err_msg: format!("Invalid TopicRegionKey: {}", value),
106            })?;
107        let topic = captures.get(1).map(|m| m.as_str()).unwrap();
108        let region_id = captures[2].parse::<u64>().map_err(|_| {
109            InvalidMetadataSnafu {
110                err_msg: format!("Invalid region id in TopicRegionKey: {}", value),
111            }
112            .build()
113        })?;
114        Ok(TopicRegionKey {
115            region_id: RegionId::from_u64(region_id),
116            topic,
117        })
118    }
119}
120
121fn topic_region_decoder(value: &KeyValue) -> Result<TopicRegionKey<'_>> {
122    let key = TopicRegionKey::from_bytes(&value.key)?;
123    Ok(key)
124}
125
126/// Manages map of topics and regions in kvbackend.
127pub struct TopicRegionManager {
128    kv_backend: KvBackendRef,
129}
130
131impl TopicRegionManager {
132    pub fn new(kv_backend: KvBackendRef) -> Self {
133        Self { kv_backend }
134    }
135
136    pub async fn put(&self, key: TopicRegionKey<'_>) -> Result<()> {
137        let put_req = PutRequest {
138            key: key.to_bytes(),
139            value: vec![],
140            prev_kv: false,
141        };
142        self.kv_backend.put(put_req).await?;
143        Ok(())
144    }
145
146    pub async fn batch_put(&self, keys: Vec<TopicRegionKey<'_>>) -> Result<()> {
147        let req = BatchPutRequest {
148            kvs: keys
149                .into_iter()
150                .map(|key| KeyValue {
151                    key: key.to_bytes(),
152                    value: vec![],
153                })
154                .collect(),
155            prev_kv: false,
156        };
157        self.kv_backend.batch_put(req).await?;
158        Ok(())
159    }
160
161    pub fn build_create_txn(
162        &self,
163        table_id: TableId,
164        region_wal_options: &HashMap<RegionNumber, String>,
165    ) -> Result<Txn> {
166        let region_wal_options = parse_region_wal_options(region_wal_options)?;
167        let topic_region_mapping = self.get_topic_region_mapping(table_id, &region_wal_options);
168        let topic_region_keys = topic_region_mapping
169            .iter()
170            .map(|(topic, region_id)| TopicRegionKey::new(*topic, region_id))
171            .collect::<Vec<_>>();
172        let operations = topic_region_keys
173            .into_iter()
174            .map(|key| TxnOp::Put(key.to_bytes(), vec![]))
175            .collect::<Vec<_>>();
176        Ok(Txn::new().and_then(operations))
177    }
178
179    /// Returns the list of region ids using specified topic.
180    pub async fn regions(&self, topic: &str) -> Result<Vec<RegionId>> {
181        let prefix = TopicRegionKey::range_topic_key(topic);
182        let req = RangeRequest::new().with_prefix(prefix.as_bytes());
183        let resp = self.kv_backend.range(req).await?;
184        let region_ids = resp
185            .kvs
186            .iter()
187            .map(topic_region_decoder)
188            .collect::<Result<Vec<_>>>()?;
189        Ok(region_ids.iter().map(|key| key.region_id).collect())
190    }
191
192    pub async fn delete(&self, key: TopicRegionKey<'_>) -> Result<()> {
193        let raw_key = key.to_bytes();
194        self.kv_backend.delete(&raw_key, false).await?;
195        Ok(())
196    }
197
198    pub async fn batch_delete(&self, keys: Vec<TopicRegionKey<'_>>) -> Result<()> {
199        let raw_keys = keys.iter().map(|key| key.to_bytes()).collect::<Vec<_>>();
200        let req = BatchDeleteRequest {
201            keys: raw_keys,
202            prev_kv: false,
203        };
204        self.kv_backend.batch_delete(req).await?;
205        Ok(())
206    }
207
208    /// Retrieves a mapping of [`RegionId`]s to their corresponding topics name
209    /// based on the provided table ID and WAL options.
210    ///
211    /// # Returns
212    /// A vector of tuples, where each tuple contains a [`RegionId`] and its corresponding topic name.
213    pub fn get_topic_region_mapping<'a>(
214        &self,
215        table_id: TableId,
216        region_wal_options: &'a HashMap<RegionNumber, WalOptions>,
217    ) -> Vec<(RegionId, &'a str)> {
218        region_wal_options
219            .keys()
220            .filter_map(
221                |region_number| match region_wal_options.get(region_number) {
222                    Some(WalOptions::Kafka(kafka)) => {
223                        let region_id = RegionId::new(table_id, *region_number);
224                        Some((region_id, kafka.topic.as_str()))
225                    }
226                    Some(WalOptions::RaftEngine) => None,
227                    Some(WalOptions::Noop) => None,
228                    None => None,
229                },
230            )
231            .collect::<Vec<_>>()
232    }
233}
234
235#[cfg(test)]
236mod tests {
237    use std::sync::Arc;
238
239    use common_wal::options::KafkaWalOptions;
240
241    use super::*;
242    use crate::kv_backend::memory::MemoryKvBackend;
243
244    #[tokio::test]
245    async fn test_topic_region_manager() {
246        let kv_backend = Arc::new(MemoryKvBackend::default());
247        let manager = TopicRegionManager::new(kv_backend.clone());
248
249        let topics = (0..16).map(|i| format!("topic_{}", i)).collect::<Vec<_>>();
250        let keys = (0..64)
251            .map(|i| TopicRegionKey::new(RegionId::from_u64(i), &topics[(i % 16) as usize]))
252            .collect::<Vec<_>>();
253
254        manager.batch_put(keys.clone()).await.unwrap();
255
256        let mut key_values = manager.regions(&topics[0]).await.unwrap();
257        let expected = keys
258            .iter()
259            .filter_map(|key| {
260                if key.topic == topics[0] {
261                    Some(key.region_id)
262                } else {
263                    None
264                }
265            })
266            .collect::<Vec<_>>();
267        key_values.sort_by_key(|id| id.as_u64());
268        assert_eq!(key_values, expected);
269
270        let key = TopicRegionKey::new(RegionId::from_u64(0), "topic_0");
271        manager.delete(key.clone()).await.unwrap();
272        let mut key_values = manager.regions(&topics[0]).await.unwrap();
273        let expected = keys
274            .iter()
275            .filter_map(|key| {
276                if key.topic == topics[0] && key.region_id != RegionId::from_u64(0) {
277                    Some(key.region_id)
278                } else {
279                    None
280                }
281            })
282            .collect::<Vec<_>>();
283        key_values.sort_by_key(|id| id.as_u64());
284        assert_eq!(key_values, expected);
285    }
286
287    #[test]
288    fn test_topic_region_map() {
289        let kv_backend = Arc::new(MemoryKvBackend::default());
290        let manager = TopicRegionManager::new(kv_backend.clone());
291
292        let table_id = 1;
293        let region_wal_options = (0..64)
294            .map(|i| {
295                let region_number = i;
296                let wal_options = if i % 2 == 0 {
297                    WalOptions::Kafka(KafkaWalOptions {
298                        topic: format!("topic_{}", i),
299                    })
300                } else {
301                    WalOptions::RaftEngine
302                };
303                (region_number, serde_json::to_string(&wal_options).unwrap())
304            })
305            .collect::<HashMap<_, _>>();
306
307        let region_wal_options = parse_region_wal_options(&region_wal_options).unwrap();
308        let mut topic_region_mapping =
309            manager.get_topic_region_mapping(table_id, &region_wal_options);
310        let mut expected = (0..64)
311            .filter_map(|i| {
312                if i % 2 == 0 {
313                    Some((RegionId::new(table_id, i), format!("topic_{}", i)))
314                } else {
315                    None
316                }
317            })
318            .collect::<Vec<_>>();
319        topic_region_mapping.sort_by_key(|(region_id, _)| region_id.as_u64());
320        let topic_region_map = topic_region_mapping
321            .iter()
322            .map(|(region_id, topic)| (*region_id, topic.to_string()))
323            .collect::<Vec<_>>();
324        expected.sort_by_key(|(region_id, _)| region_id.as_u64());
325        assert_eq!(topic_region_map, expected);
326    }
327}