common_meta/key/
catalog_name.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::Display;
16
17use common_catalog::consts::DEFAULT_CATALOG_NAME;
18use futures::stream::BoxStream;
19use serde::{Deserialize, Serialize};
20use snafu::{OptionExt, ResultExt};
21
22use crate::error::{self, Error, InvalidMetadataSnafu, Result};
23use crate::key::{MetadataKey, CATALOG_NAME_KEY_PATTERN, CATALOG_NAME_KEY_PREFIX};
24use crate::kv_backend::KvBackendRef;
25use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE};
26use crate::rpc::store::RangeRequest;
27use crate::rpc::KeyValue;
28
29/// The catalog name key, indices all catalog names
30///
31/// The layout: `__catalog_name/{catalog_name}`
32#[derive(Debug, Clone, Copy, PartialEq)]
33pub struct CatalogNameKey<'a> {
34    pub catalog: &'a str,
35}
36
37impl Default for CatalogNameKey<'_> {
38    fn default() -> Self {
39        Self {
40            catalog: DEFAULT_CATALOG_NAME,
41        }
42    }
43}
44
45#[derive(Debug, Serialize, Deserialize)]
46pub struct CatalogNameValue;
47
48impl<'a> CatalogNameKey<'a> {
49    pub fn new(catalog: &'a str) -> Self {
50        Self { catalog }
51    }
52
53    pub fn range_start_key() -> String {
54        format!("{}/", CATALOG_NAME_KEY_PREFIX)
55    }
56}
57
58impl<'a> MetadataKey<'a, CatalogNameKey<'a>> for CatalogNameKey<'_> {
59    fn to_bytes(&self) -> Vec<u8> {
60        self.to_string().into_bytes()
61    }
62
63    fn from_bytes(bytes: &'a [u8]) -> Result<CatalogNameKey<'a>> {
64        let key = std::str::from_utf8(bytes).map_err(|e| {
65            InvalidMetadataSnafu {
66                err_msg: format!(
67                    "CatalogNameKey '{}' is not a valid UTF8 string: {e}",
68                    String::from_utf8_lossy(bytes)
69                ),
70            }
71            .build()
72        })?;
73        CatalogNameKey::try_from(key)
74    }
75}
76
77impl Display for CatalogNameKey<'_> {
78    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
79        write!(f, "{}/{}", CATALOG_NAME_KEY_PREFIX, self.catalog)
80    }
81}
82
83impl<'a> TryFrom<&'a str> for CatalogNameKey<'a> {
84    type Error = Error;
85
86    fn try_from(s: &'a str) -> Result<Self> {
87        let captures = CATALOG_NAME_KEY_PATTERN
88            .captures(s)
89            .context(InvalidMetadataSnafu {
90                err_msg: format!("Illegal CatalogNameKey format: '{s}'"),
91            })?;
92
93        // Safety: pass the regex check above
94        Ok(Self {
95            catalog: captures.get(1).unwrap().as_str(),
96        })
97    }
98}
99
100/// Decoder `KeyValue` to {catalog}
101pub fn catalog_decoder(kv: KeyValue) -> Result<String> {
102    let str = std::str::from_utf8(&kv.key).context(error::ConvertRawKeySnafu)?;
103    let catalog_name = CatalogNameKey::try_from(str)?;
104
105    Ok(catalog_name.catalog.to_string())
106}
107
108pub struct CatalogManager {
109    kv_backend: KvBackendRef,
110}
111
112impl CatalogManager {
113    pub fn new(kv_backend: KvBackendRef) -> Self {
114        Self { kv_backend }
115    }
116
117    /// Creates `CatalogNameKey`.
118    pub async fn create(&self, catalog: CatalogNameKey<'_>, if_not_exists: bool) -> Result<()> {
119        let _timer = crate::metrics::METRIC_META_CREATE_CATALOG.start_timer();
120
121        let raw_key = catalog.to_bytes();
122        let raw_value = CatalogNameValue.try_as_raw_value()?;
123        if self
124            .kv_backend
125            .put_conditionally(raw_key, raw_value, if_not_exists)
126            .await?
127        {
128            crate::metrics::METRIC_META_CREATE_CATALOG_COUNTER.inc();
129        }
130
131        Ok(())
132    }
133
134    pub async fn exists(&self, catalog: CatalogNameKey<'_>) -> Result<bool> {
135        let raw_key = catalog.to_bytes();
136
137        self.kv_backend.exists(&raw_key).await
138    }
139
140    pub fn catalog_names(&self) -> BoxStream<'static, Result<String>> {
141        let start_key = CatalogNameKey::range_start_key();
142        let req = RangeRequest::new().with_prefix(start_key.as_bytes());
143
144        let stream = PaginationStream::new(
145            self.kv_backend.clone(),
146            req,
147            DEFAULT_PAGE_SIZE,
148            catalog_decoder,
149        )
150        .into_stream();
151
152        Box::pin(stream)
153    }
154}
155
156#[cfg(test)]
157mod tests {
158    use std::sync::Arc;
159
160    use super::*;
161    use crate::kv_backend::memory::MemoryKvBackend;
162
163    #[test]
164    fn test_serialization() {
165        let key = CatalogNameKey::new("my-catalog");
166
167        assert_eq!(key.to_string(), "__catalog_name/my-catalog");
168
169        let parsed = CatalogNameKey::from_bytes(b"__catalog_name/my-catalog").unwrap();
170
171        assert_eq!(key, parsed);
172    }
173
174    #[tokio::test]
175    async fn test_key_exist() {
176        let manager = CatalogManager::new(Arc::new(MemoryKvBackend::default()));
177
178        let catalog_key = CatalogNameKey::new("my-catalog");
179
180        manager.create(catalog_key, false).await.unwrap();
181
182        assert!(manager.exists(catalog_key).await.unwrap());
183
184        let wrong_catalog_key = CatalogNameKey::new("my-wrong");
185
186        assert!(!manager.exists(wrong_catalog_key).await.unwrap());
187    }
188}