common_meta/key/
schema_name.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::fmt::Display;
17
18use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
19use common_time::DatabaseTimeToLive;
20use futures::stream::BoxStream;
21use humantime_serde::re::humantime;
22use serde::{Deserialize, Serialize};
23use snafu::{ensure, OptionExt, ResultExt};
24
25use crate::ensure_values;
26use crate::error::{self, Error, InvalidMetadataSnafu, ParseOptionSnafu, Result};
27use crate::key::txn_helper::TxnOpGetResponseSet;
28use crate::key::{
29    DeserializedValueWithBytes, MetadataKey, SCHEMA_NAME_KEY_PATTERN, SCHEMA_NAME_KEY_PREFIX,
30};
31use crate::kv_backend::txn::Txn;
32use crate::kv_backend::KvBackendRef;
33use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE};
34use crate::rpc::store::RangeRequest;
35use crate::rpc::KeyValue;
36
37const OPT_KEY_TTL: &str = "ttl";
38
39/// The schema name key, indices all schema names belong to the {catalog_name}
40///
41/// The layout:  `__schema_name/{catalog_name}/{schema_name}`.
42#[derive(Debug, Clone, Copy, PartialEq)]
43pub struct SchemaNameKey<'a> {
44    pub catalog: &'a str,
45    pub schema: &'a str,
46}
47
48impl Default for SchemaNameKey<'_> {
49    fn default() -> Self {
50        Self {
51            catalog: DEFAULT_CATALOG_NAME,
52            schema: DEFAULT_SCHEMA_NAME,
53        }
54    }
55}
56
57#[derive(Debug, Default, Clone, PartialEq, Serialize, Deserialize)]
58pub struct SchemaNameValue {
59    #[serde(default)]
60    pub ttl: Option<DatabaseTimeToLive>,
61}
62
63impl Display for SchemaNameValue {
64    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
65        if let Some(ttl) = self.ttl.map(|i| i.to_string()) {
66            write!(f, "ttl='{}'", ttl)?;
67        }
68
69        Ok(())
70    }
71}
72
73impl TryFrom<&HashMap<String, String>> for SchemaNameValue {
74    type Error = Error;
75
76    fn try_from(value: &HashMap<String, String>) -> std::result::Result<Self, Self::Error> {
77        let ttl = value
78            .get(OPT_KEY_TTL)
79            .map(|ttl_str| {
80                ttl_str.parse::<humantime::Duration>().map_err(|_| {
81                    ParseOptionSnafu {
82                        key: OPT_KEY_TTL,
83                        value: ttl_str.clone(),
84                    }
85                    .build()
86                })
87            })
88            .transpose()?
89            .map(|ttl| ttl.into());
90        Ok(Self { ttl })
91    }
92}
93
94impl From<SchemaNameValue> for HashMap<String, String> {
95    fn from(value: SchemaNameValue) -> Self {
96        let mut opts = HashMap::new();
97        if let Some(ttl) = value.ttl.map(|ttl| ttl.to_string()) {
98            opts.insert(OPT_KEY_TTL.to_string(), ttl);
99        }
100        opts
101    }
102}
103
104impl<'a> SchemaNameKey<'a> {
105    pub fn new(catalog: &'a str, schema: &'a str) -> Self {
106        Self { catalog, schema }
107    }
108
109    pub fn range_start_key(catalog: &str) -> String {
110        format!("{}/{}/", SCHEMA_NAME_KEY_PREFIX, catalog)
111    }
112}
113
114impl Display for SchemaNameKey<'_> {
115    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
116        write!(
117            f,
118            "{}/{}/{}",
119            SCHEMA_NAME_KEY_PREFIX, self.catalog, self.schema
120        )
121    }
122}
123
124impl<'a> MetadataKey<'a, SchemaNameKey<'a>> for SchemaNameKey<'_> {
125    fn to_bytes(&self) -> Vec<u8> {
126        self.to_string().into_bytes()
127    }
128
129    fn from_bytes(bytes: &'a [u8]) -> Result<SchemaNameKey<'a>> {
130        let key = std::str::from_utf8(bytes).map_err(|e| {
131            InvalidMetadataSnafu {
132                err_msg: format!(
133                    "SchemaNameKey '{}' is not a valid UTF8 string: {e}",
134                    String::from_utf8_lossy(bytes)
135                ),
136            }
137            .build()
138        })?;
139        SchemaNameKey::try_from(key)
140    }
141}
142
143/// Decodes `KeyValue` to {schema}
144pub fn schema_decoder(kv: KeyValue) -> Result<String> {
145    let str = std::str::from_utf8(&kv.key).context(error::ConvertRawKeySnafu)?;
146    let schema_name = SchemaNameKey::try_from(str)?;
147
148    Ok(schema_name.schema.to_string())
149}
150
151impl<'a> TryFrom<&'a str> for SchemaNameKey<'a> {
152    type Error = Error;
153
154    fn try_from(s: &'a str) -> Result<Self> {
155        let captures = SCHEMA_NAME_KEY_PATTERN
156            .captures(s)
157            .context(InvalidMetadataSnafu {
158                err_msg: format!("Illegal SchemaNameKey format: '{s}'"),
159            })?;
160
161        // Safety: pass the regex check above
162        Ok(Self {
163            catalog: captures.get(1).unwrap().as_str(),
164            schema: captures.get(2).unwrap().as_str(),
165        })
166    }
167}
168
169#[derive(Clone)]
170pub struct SchemaManager {
171    kv_backend: KvBackendRef,
172}
173
174pub type SchemaNameDecodeResult = Result<Option<DeserializedValueWithBytes<SchemaNameValue>>>;
175
176impl SchemaManager {
177    pub fn new(kv_backend: KvBackendRef) -> Self {
178        Self { kv_backend }
179    }
180
181    /// Creates `SchemaNameKey`.
182    pub async fn create(
183        &self,
184        schema: SchemaNameKey<'_>,
185        value: Option<SchemaNameValue>,
186        if_not_exists: bool,
187    ) -> Result<()> {
188        let _timer = crate::metrics::METRIC_META_CREATE_SCHEMA.start_timer();
189
190        let raw_key = schema.to_bytes();
191        let raw_value = value.unwrap_or_default().try_as_raw_value()?;
192        if self
193            .kv_backend
194            .put_conditionally(raw_key, raw_value, if_not_exists)
195            .await?
196        {
197            crate::metrics::METRIC_META_CREATE_SCHEMA_COUNTER.inc();
198        }
199
200        Ok(())
201    }
202
203    pub async fn exists(&self, schema: SchemaNameKey<'_>) -> Result<bool> {
204        let raw_key = schema.to_bytes();
205
206        self.kv_backend.exists(&raw_key).await
207    }
208
209    pub async fn get(
210        &self,
211        schema: SchemaNameKey<'_>,
212    ) -> Result<Option<DeserializedValueWithBytes<SchemaNameValue>>> {
213        let raw_key = schema.to_bytes();
214        self.kv_backend
215            .get(&raw_key)
216            .await?
217            .map(|x| DeserializedValueWithBytes::from_inner_slice(&x.value))
218            .transpose()
219    }
220
221    /// Deletes a [SchemaNameKey].
222    pub async fn delete(&self, schema: SchemaNameKey<'_>) -> Result<()> {
223        let raw_key = schema.to_bytes();
224        self.kv_backend.delete(&raw_key, false).await?;
225
226        Ok(())
227    }
228
229    pub(crate) fn build_update_txn(
230        &self,
231        schema: SchemaNameKey<'_>,
232        current_schema_value: &DeserializedValueWithBytes<SchemaNameValue>,
233        new_schema_value: &SchemaNameValue,
234    ) -> Result<(
235        Txn,
236        impl FnOnce(&mut TxnOpGetResponseSet) -> SchemaNameDecodeResult,
237    )> {
238        let raw_key = schema.to_bytes();
239        let raw_value = current_schema_value.get_raw_bytes();
240        let new_raw_value: Vec<u8> = new_schema_value.try_as_raw_value()?;
241
242        let txn = Txn::compare_and_put(raw_key.clone(), raw_value, new_raw_value);
243
244        Ok((
245            txn,
246            TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(raw_key)),
247        ))
248    }
249
250    /// Updates a [SchemaNameKey].
251    pub async fn update(
252        &self,
253        schema: SchemaNameKey<'_>,
254        current_schema_value: &DeserializedValueWithBytes<SchemaNameValue>,
255        new_schema_value: &SchemaNameValue,
256    ) -> Result<()> {
257        let (txn, on_failure) =
258            self.build_update_txn(schema, current_schema_value, new_schema_value)?;
259        let mut r = self.kv_backend.txn(txn).await?;
260
261        if !r.succeeded {
262            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
263            let remote_schema_value = on_failure(&mut set)?
264                .context(error::UnexpectedSnafu {
265                    err_msg:
266                        "Reads the empty schema name value in comparing operation of updating schema name value",
267                })?
268                .into_inner();
269
270            let op_name = "the updating schema name value";
271            ensure_values!(&remote_schema_value, new_schema_value, op_name);
272        }
273
274        Ok(())
275    }
276
277    /// Returns a schema stream, it lists all schemas belong to the target `catalog`.
278    pub fn schema_names(&self, catalog: &str) -> BoxStream<'static, Result<String>> {
279        let start_key = SchemaNameKey::range_start_key(catalog);
280        let req = RangeRequest::new().with_prefix(start_key.as_bytes());
281
282        let stream = PaginationStream::new(
283            self.kv_backend.clone(),
284            req,
285            DEFAULT_PAGE_SIZE,
286            schema_decoder,
287        )
288        .into_stream();
289
290        Box::pin(stream)
291    }
292}
293
294#[derive(Debug, Clone, Hash, Eq, PartialEq, Deserialize, Serialize)]
295pub struct SchemaName {
296    pub catalog_name: String,
297    pub schema_name: String,
298}
299
300impl<'a> From<&'a SchemaName> for SchemaNameKey<'a> {
301    fn from(value: &'a SchemaName) -> Self {
302        Self {
303            catalog: &value.catalog_name,
304            schema: &value.schema_name,
305        }
306    }
307}
308
309#[cfg(test)]
310mod tests {
311    use std::sync::Arc;
312    use std::time::Duration;
313
314    use super::*;
315    use crate::kv_backend::memory::MemoryKvBackend;
316
317    #[test]
318    fn test_display_schema_value() {
319        let schema_value = SchemaNameValue { ttl: None };
320        assert_eq!("", schema_value.to_string());
321
322        let schema_value = SchemaNameValue {
323            ttl: Some(Duration::from_secs(9).into()),
324        };
325        assert_eq!("ttl='9s'", schema_value.to_string());
326
327        let schema_value = SchemaNameValue {
328            ttl: Some(Duration::from_secs(0).into()),
329        };
330        assert_eq!("ttl='forever'", schema_value.to_string());
331    }
332
333    #[test]
334    fn test_serialization() {
335        let key = SchemaNameKey::new("my-catalog", "my-schema");
336        assert_eq!(key.to_string(), "__schema_name/my-catalog/my-schema");
337
338        let parsed = SchemaNameKey::from_bytes(b"__schema_name/my-catalog/my-schema").unwrap();
339
340        assert_eq!(key, parsed);
341
342        let value = SchemaNameValue {
343            ttl: Some(Duration::from_secs(10).into()),
344        };
345        let mut opts: HashMap<String, String> = HashMap::new();
346        opts.insert("ttl".to_string(), "10s".to_string());
347        let from_value = SchemaNameValue::try_from(&opts).unwrap();
348        assert_eq!(value, from_value);
349
350        let parsed = SchemaNameValue::try_from_raw_value(
351            serde_json::json!({"ttl": "10s"}).to_string().as_bytes(),
352        )
353        .unwrap();
354        assert_eq!(Some(value), parsed);
355
356        let forever = SchemaNameValue {
357            ttl: Some(Default::default()),
358        };
359        let parsed = SchemaNameValue::try_from_raw_value(
360            serde_json::json!({"ttl": "forever"}).to_string().as_bytes(),
361        )
362        .unwrap();
363        assert_eq!(Some(forever), parsed);
364
365        let instant_err = SchemaNameValue::try_from_raw_value(
366            serde_json::json!({"ttl": "instant"}).to_string().as_bytes(),
367        );
368        assert!(instant_err.is_err());
369
370        let none = SchemaNameValue::try_from_raw_value("null".as_bytes()).unwrap();
371        assert!(none.is_none());
372
373        let err_empty = SchemaNameValue::try_from_raw_value("".as_bytes());
374        assert!(err_empty.is_err());
375    }
376
377    #[tokio::test]
378    async fn test_key_exist() {
379        let manager = SchemaManager::new(Arc::new(MemoryKvBackend::default()));
380        let schema_key = SchemaNameKey::new("my-catalog", "my-schema");
381        manager.create(schema_key, None, false).await.unwrap();
382
383        assert!(manager.exists(schema_key).await.unwrap());
384
385        let wrong_schema_key = SchemaNameKey::new("my-catalog", "my-wrong");
386
387        assert!(!manager.exists(wrong_schema_key).await.unwrap());
388    }
389
390    #[tokio::test]
391    async fn test_update_schema_value() {
392        let manager = SchemaManager::new(Arc::new(MemoryKvBackend::default()));
393        let schema_key = SchemaNameKey::new("my-catalog", "my-schema");
394        manager.create(schema_key, None, false).await.unwrap();
395
396        let current_schema_value = manager.get(schema_key).await.unwrap().unwrap();
397        let new_schema_value = SchemaNameValue {
398            ttl: Some(Duration::from_secs(10).into()),
399        };
400        manager
401            .update(schema_key, &current_schema_value, &new_schema_value)
402            .await
403            .unwrap();
404
405        // Update with the same value, should be ok
406        manager
407            .update(schema_key, &current_schema_value, &new_schema_value)
408            .await
409            .unwrap();
410
411        let new_schema_value = SchemaNameValue {
412            ttl: Some(Duration::from_secs(40).into()),
413        };
414        let incorrect_schema_value = SchemaNameValue {
415            ttl: Some(Duration::from_secs(20).into()),
416        }
417        .try_as_raw_value()
418        .unwrap();
419        let incorrect_schema_value =
420            DeserializedValueWithBytes::from_inner_slice(&incorrect_schema_value).unwrap();
421
422        manager
423            .update(schema_key, &incorrect_schema_value, &new_schema_value)
424            .await
425            .unwrap_err();
426
427        let current_schema_value = manager.get(schema_key).await.unwrap().unwrap();
428        let new_schema_value = SchemaNameValue { ttl: None };
429        manager
430            .update(schema_key, &current_schema_value, &new_schema_value)
431            .await
432            .unwrap();
433
434        let current_schema_value = manager.get(schema_key).await.unwrap().unwrap();
435        assert_eq!(new_schema_value, *current_schema_value);
436    }
437}