common_meta/key/
topic_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::fmt::{self, Display};
17
18use common_wal::options::WalOptions;
19use serde::{Deserialize, Serialize};
20use snafu::OptionExt;
21use store_api::storage::{RegionId, RegionNumber};
22use table::metadata::TableId;
23
24use crate::ddl::utils::parse_region_wal_options;
25use crate::error::{Error, InvalidMetadataSnafu, Result};
26use crate::key::{MetadataKey, MetadataValue, TOPIC_REGION_PATTERN, TOPIC_REGION_PREFIX};
27use crate::kv_backend::KvBackendRef;
28use crate::kv_backend::txn::{Txn, TxnOp};
29use crate::rpc::KeyValue;
30use crate::rpc::store::{
31    BatchDeleteRequest, BatchGetRequest, BatchPutRequest, PutRequest, RangeRequest,
32};
33
34// The TopicRegionKey is a key for the topic-region mapping in the kvbackend.
35// The layout of the key is `__topic_region/{topic_name}/{region_id}`.
36#[derive(Debug, Clone, PartialEq)]
37pub struct TopicRegionKey<'a> {
38    pub region_id: RegionId,
39    pub topic: &'a str,
40}
41
42/// Represents additional information for a region when using a shared WAL.
43#[derive(Debug, Clone, Copy, Serialize, Deserialize, Default)]
44pub struct TopicRegionValue {
45    #[serde(default, skip_serializing_if = "Option::is_none")]
46    pub checkpoint: Option<ReplayCheckpoint>,
47}
48
49#[derive(Debug, Clone, Copy, Serialize, Deserialize, Default, PartialEq, Eq, PartialOrd, Ord)]
50pub struct ReplayCheckpoint {
51    #[serde(default)]
52    pub entry_id: u64,
53    #[serde(default, skip_serializing_if = "Option::is_none")]
54    pub metadata_entry_id: Option<u64>,
55}
56
57impl<'a> TopicRegionKey<'a> {
58    pub fn new(region_id: RegionId, topic: &'a str) -> Self {
59        Self { region_id, topic }
60    }
61
62    pub fn range_topic_key(topic: &str) -> String {
63        format!("{}/{}/", TOPIC_REGION_PREFIX, topic)
64    }
65}
66
67impl<'a> MetadataKey<'a, TopicRegionKey<'a>> for TopicRegionKey<'a> {
68    fn to_bytes(&self) -> Vec<u8> {
69        self.to_string().into_bytes()
70    }
71
72    fn from_bytes(bytes: &'a [u8]) -> Result<TopicRegionKey<'a>> {
73        let key = std::str::from_utf8(bytes).map_err(|e| {
74            InvalidMetadataSnafu {
75                err_msg: format!(
76                    "TopicRegionKey '{}' is not a valid UTF8 string: {e}",
77                    String::from_utf8_lossy(bytes)
78                ),
79            }
80            .build()
81        })?;
82        TopicRegionKey::try_from(key)
83    }
84}
85
86impl Display for TopicRegionKey<'_> {
87    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
88        write!(
89            f,
90            "{}{}",
91            Self::range_topic_key(self.topic),
92            self.region_id.as_u64()
93        )
94    }
95}
96
97impl<'a> TryFrom<&'a str> for TopicRegionKey<'a> {
98    type Error = Error;
99
100    /// Value is of format `{prefix}/{topic}/{region_id}`
101    fn try_from(value: &'a str) -> Result<TopicRegionKey<'a>> {
102        let captures = TOPIC_REGION_PATTERN
103            .captures(value)
104            .context(InvalidMetadataSnafu {
105                err_msg: format!("Invalid TopicRegionKey: {}", value),
106            })?;
107        let topic = captures.get(1).map(|m| m.as_str()).unwrap();
108        let region_id = captures[2].parse::<u64>().map_err(|_| {
109            InvalidMetadataSnafu {
110                err_msg: format!("Invalid region id in TopicRegionKey: {}", value),
111            }
112            .build()
113        })?;
114        Ok(TopicRegionKey {
115            region_id: RegionId::from_u64(region_id),
116            topic,
117        })
118    }
119}
120
121impl ReplayCheckpoint {
122    /// Creates a new [`ReplayCheckpoint`] with the given entry id and metadata entry id.
123    pub fn new(entry_id: u64, metadata_entry_id: Option<u64>) -> Self {
124        Self {
125            entry_id,
126            metadata_entry_id,
127        }
128    }
129}
130
131impl TopicRegionValue {
132    /// Creates a new [`TopicRegionValue`] with the given checkpoint.
133    pub fn new(checkpoint: Option<ReplayCheckpoint>) -> Self {
134        Self { checkpoint }
135    }
136
137    /// Returns the minimum entry id of the region.
138    ///
139    /// If the metadata entry id is not set, it returns the entry id.
140    pub fn min_entry_id(&self) -> Option<u64> {
141        match self.checkpoint {
142            Some(ReplayCheckpoint {
143                entry_id,
144                metadata_entry_id,
145            }) => match metadata_entry_id {
146                Some(metadata_entry_id) => Some(entry_id.min(metadata_entry_id)),
147                None => Some(entry_id),
148            },
149            None => None,
150        }
151    }
152}
153
154fn topic_region_decoder(value: &KeyValue) -> Result<(TopicRegionKey<'_>, TopicRegionValue)> {
155    let key = TopicRegionKey::from_bytes(&value.key)?;
156    let value = if value.value.is_empty() {
157        TopicRegionValue::default()
158    } else {
159        TopicRegionValue::try_from_raw_value(&value.value)?
160    };
161    Ok((key, value))
162}
163
164/// Manages map of topics and regions in kvbackend.
165pub struct TopicRegionManager {
166    kv_backend: KvBackendRef,
167}
168
169impl TopicRegionManager {
170    pub fn new(kv_backend: KvBackendRef) -> Self {
171        Self { kv_backend }
172    }
173
174    pub async fn put(&self, key: TopicRegionKey<'_>) -> Result<()> {
175        let put_req = PutRequest {
176            key: key.to_bytes(),
177            value: vec![],
178            prev_kv: false,
179        };
180        self.kv_backend.put(put_req).await?;
181        Ok(())
182    }
183
184    pub async fn batch_get(
185        &self,
186        keys: Vec<TopicRegionKey<'_>>,
187    ) -> Result<HashMap<RegionId, TopicRegionValue>> {
188        let raw_keys = keys.iter().map(|key| key.to_bytes()).collect::<Vec<_>>();
189        let req = BatchGetRequest { keys: raw_keys };
190        let resp = self.kv_backend.batch_get(req).await?;
191
192        let v = resp
193            .kvs
194            .into_iter()
195            .map(|kv| topic_region_decoder(&kv).map(|(key, value)| (key.region_id, value)))
196            .collect::<Result<HashMap<_, _>>>()?;
197
198        Ok(v)
199    }
200
201    pub async fn get(&self, key: TopicRegionKey<'_>) -> Result<Option<TopicRegionValue>> {
202        let key_bytes = key.to_bytes();
203        let resp = self.kv_backend.get(&key_bytes).await?;
204        let value = resp
205            .map(|kv| topic_region_decoder(&kv).map(|(_, value)| value))
206            .transpose()?;
207
208        Ok(value)
209    }
210
211    pub async fn batch_put(
212        &self,
213        keys: &[(TopicRegionKey<'_>, Option<TopicRegionValue>)],
214    ) -> Result<()> {
215        let req = BatchPutRequest {
216            kvs: keys
217                .iter()
218                .map(|(key, value)| {
219                    let value = value
220                        .map(|v| v.try_as_raw_value())
221                        .transpose()?
222                        .unwrap_or_default();
223
224                    Ok(KeyValue {
225                        key: key.to_bytes(),
226                        value,
227                    })
228                })
229                .collect::<Result<Vec<_>>>()?,
230            prev_kv: false,
231        };
232        self.kv_backend.batch_put(req).await?;
233        Ok(())
234    }
235
236    /// Build a create topic region mapping transaction. It only executes while the primary keys comparing successes.
237    pub fn build_create_txn(
238        &self,
239        table_id: TableId,
240        region_wal_options: &HashMap<RegionNumber, String>,
241    ) -> Result<Txn> {
242        let region_wal_options = parse_region_wal_options(region_wal_options)?;
243        let topic_region_mapping = self.get_topic_region_mapping(table_id, &region_wal_options);
244        let topic_region_keys = topic_region_mapping
245            .iter()
246            .map(|(topic, region_id)| TopicRegionKey::new(*topic, region_id))
247            .collect::<Vec<_>>();
248        let operations = topic_region_keys
249            .into_iter()
250            .map(|key| TxnOp::Put(key.to_bytes(), vec![]))
251            .collect::<Vec<_>>();
252        Ok(Txn::new().and_then(operations))
253    }
254
255    /// Returns the map of [`RegionId`] to their corresponding topic [`TopicRegionValue`].
256    pub async fn regions(&self, topic: &str) -> Result<HashMap<RegionId, TopicRegionValue>> {
257        let prefix = TopicRegionKey::range_topic_key(topic);
258        let req = RangeRequest::new().with_prefix(prefix.as_bytes());
259        let resp = self.kv_backend.range(req).await?;
260        let region_ids = resp
261            .kvs
262            .iter()
263            .map(topic_region_decoder)
264            .collect::<Result<Vec<_>>>()?;
265        Ok(region_ids
266            .into_iter()
267            .map(|(key, value)| (key.region_id, value))
268            .collect())
269    }
270
271    pub async fn delete(&self, key: TopicRegionKey<'_>) -> Result<()> {
272        let raw_key = key.to_bytes();
273        self.kv_backend.delete(&raw_key, false).await?;
274        Ok(())
275    }
276
277    pub async fn batch_delete(&self, keys: Vec<TopicRegionKey<'_>>) -> Result<()> {
278        let raw_keys = keys.iter().map(|key| key.to_bytes()).collect::<Vec<_>>();
279        let req = BatchDeleteRequest {
280            keys: raw_keys,
281            prev_kv: false,
282        };
283        self.kv_backend.batch_delete(req).await?;
284        Ok(())
285    }
286
287    /// Retrieves a mapping of [`RegionId`]s to their corresponding topics name
288    /// based on the provided table ID and WAL options.
289    ///
290    /// # Returns
291    /// A vector of tuples, where each tuple contains a [`RegionId`] and its corresponding topic name.
292    pub fn get_topic_region_mapping<'a>(
293        &self,
294        table_id: TableId,
295        region_wal_options: &'a HashMap<RegionNumber, WalOptions>,
296    ) -> Vec<(RegionId, &'a str)> {
297        region_wal_options
298            .keys()
299            .filter_map(
300                |region_number| match region_wal_options.get(region_number) {
301                    Some(WalOptions::Kafka(kafka)) => {
302                        let region_id = RegionId::new(table_id, *region_number);
303                        Some((region_id, kafka.topic.as_str()))
304                    }
305                    Some(WalOptions::RaftEngine) => None,
306                    Some(WalOptions::Noop) => None,
307                    None => None,
308                },
309            )
310            .collect::<Vec<_>>()
311    }
312}
313
314#[cfg(test)]
315mod tests {
316    use std::sync::Arc;
317
318    use common_wal::options::KafkaWalOptions;
319
320    use super::*;
321    use crate::kv_backend::memory::MemoryKvBackend;
322
323    #[tokio::test]
324    async fn test_topic_region_manager() {
325        let kv_backend = Arc::new(MemoryKvBackend::default());
326        let manager = TopicRegionManager::new(kv_backend.clone());
327
328        let topics = (0..16).map(|i| format!("topic_{}", i)).collect::<Vec<_>>();
329        let keys = (0..64)
330            .map(|i| {
331                (
332                    TopicRegionKey::new(RegionId::from_u64(i), &topics[(i % 16) as usize]),
333                    None,
334                )
335            })
336            .collect::<Vec<_>>();
337
338        manager.batch_put(&keys).await.unwrap();
339        let mut key_values = manager
340            .regions(&topics[0])
341            .await
342            .unwrap()
343            .into_keys()
344            .collect::<Vec<_>>();
345        let expected = keys
346            .iter()
347            .filter_map(|(key, _)| {
348                if key.topic == topics[0] {
349                    Some(key.region_id)
350                } else {
351                    None
352                }
353            })
354            .collect::<Vec<_>>();
355        key_values.sort_by_key(|id| id.as_u64());
356        assert_eq!(key_values, expected);
357
358        let key = TopicRegionKey::new(RegionId::from_u64(0), "topic_0");
359        manager.delete(key.clone()).await.unwrap();
360        let mut key_values = manager
361            .regions(&topics[0])
362            .await
363            .unwrap()
364            .into_keys()
365            .collect::<Vec<_>>();
366        let expected = keys
367            .iter()
368            .filter_map(|(key, _)| {
369                if key.topic == topics[0] && key.region_id != RegionId::from_u64(0) {
370                    Some(key.region_id)
371                } else {
372                    None
373                }
374            })
375            .collect::<Vec<_>>();
376        key_values.sort_by_key(|id| id.as_u64());
377        assert_eq!(key_values, expected);
378    }
379
380    #[test]
381    fn test_topic_region_map() {
382        let kv_backend = Arc::new(MemoryKvBackend::default());
383        let manager = TopicRegionManager::new(kv_backend.clone());
384
385        let table_id = 1;
386        let region_wal_options = (0..64)
387            .map(|i| {
388                let region_number = i;
389                let wal_options = if i % 2 == 0 {
390                    WalOptions::Kafka(KafkaWalOptions {
391                        topic: format!("topic_{}", i),
392                    })
393                } else {
394                    WalOptions::RaftEngine
395                };
396                (region_number, serde_json::to_string(&wal_options).unwrap())
397            })
398            .collect::<HashMap<_, _>>();
399
400        let region_wal_options = parse_region_wal_options(&region_wal_options).unwrap();
401        let mut topic_region_mapping =
402            manager.get_topic_region_mapping(table_id, &region_wal_options);
403        let mut expected = (0..64)
404            .filter_map(|i| {
405                if i % 2 == 0 {
406                    Some((RegionId::new(table_id, i), format!("topic_{}", i)))
407                } else {
408                    None
409                }
410            })
411            .collect::<Vec<_>>();
412        topic_region_mapping.sort_by_key(|(region_id, _)| region_id.as_u64());
413        let topic_region_map = topic_region_mapping
414            .iter()
415            .map(|(region_id, topic)| (*region_id, topic.to_string()))
416            .collect::<Vec<_>>();
417        expected.sort_by_key(|(region_id, _)| region_id.as_u64());
418        assert_eq!(topic_region_map, expected);
419    }
420
421    #[test]
422    fn test_topic_region_key_is_match() {
423        let key = "__topic_region/6f153a64-7fac-4cf6-8b0b-a7967dd73879_2/4410931412992";
424        let topic_region_key = TopicRegionKey::try_from(key).unwrap();
425        assert_eq!(
426            topic_region_key.topic,
427            "6f153a64-7fac-4cf6-8b0b-a7967dd73879_2"
428        );
429        assert_eq!(
430            topic_region_key.region_id,
431            RegionId::from_u64(4410931412992)
432        );
433    }
434}