common_meta/key/
schema_name.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16use std::fmt::Display;
17
18use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
19use common_time::DatabaseTimeToLive;
20use futures::stream::BoxStream;
21use humantime_serde::re::humantime;
22use serde::{Deserialize, Serialize};
23use snafu::{ensure, OptionExt, ResultExt};
24
25use crate::ensure_values;
26use crate::error::{self, Error, InvalidMetadataSnafu, ParseOptionSnafu, Result};
27use crate::key::txn_helper::TxnOpGetResponseSet;
28use crate::key::{
29    DeserializedValueWithBytes, MetadataKey, SCHEMA_NAME_KEY_PATTERN, SCHEMA_NAME_KEY_PREFIX,
30};
31use crate::kv_backend::txn::Txn;
32use crate::kv_backend::KvBackendRef;
33use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE};
34use crate::rpc::store::RangeRequest;
35use crate::rpc::KeyValue;
36
37const OPT_KEY_TTL: &str = "ttl";
38
39/// The schema name key, indices all schema names belong to the {catalog_name}
40///
41/// The layout:  `__schema_name/{catalog_name}/{schema_name}`.
42#[derive(Debug, Clone, Copy, PartialEq)]
43pub struct SchemaNameKey<'a> {
44    pub catalog: &'a str,
45    pub schema: &'a str,
46}
47
48impl Default for SchemaNameKey<'_> {
49    fn default() -> Self {
50        Self {
51            catalog: DEFAULT_CATALOG_NAME,
52            schema: DEFAULT_SCHEMA_NAME,
53        }
54    }
55}
56
57#[derive(Debug, Default, Clone, PartialEq, Serialize, Deserialize)]
58pub struct SchemaNameValue {
59    #[serde(default)]
60    pub ttl: Option<DatabaseTimeToLive>,
61    #[serde(default)]
62    pub extra_options: BTreeMap<String, String>,
63}
64
65impl Display for SchemaNameValue {
66    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
67        if let Some(ttl) = self.ttl.map(|i| i.to_string()) {
68            writeln!(f, "'ttl'='{}'", ttl)?;
69        }
70        for (k, v) in self.extra_options.iter() {
71            writeln!(f, "'{k}'='{v}'")?;
72        }
73
74        Ok(())
75    }
76}
77
78impl TryFrom<&HashMap<String, String>> for SchemaNameValue {
79    type Error = Error;
80
81    fn try_from(value: &HashMap<String, String>) -> std::result::Result<Self, Self::Error> {
82        let ttl = value
83            .get(OPT_KEY_TTL)
84            .map(|ttl_str| {
85                ttl_str.parse::<humantime::Duration>().map_err(|_| {
86                    ParseOptionSnafu {
87                        key: OPT_KEY_TTL,
88                        value: ttl_str.clone(),
89                    }
90                    .build()
91                })
92            })
93            .transpose()?
94            .map(|ttl| ttl.into());
95        let extra_options = value
96            .iter()
97            .filter_map(|(k, v)| {
98                if k == OPT_KEY_TTL {
99                    None
100                } else {
101                    Some((k.clone(), v.clone()))
102                }
103            })
104            .collect();
105
106        Ok(Self { ttl, extra_options })
107    }
108}
109
110impl From<SchemaNameValue> for HashMap<String, String> {
111    fn from(value: SchemaNameValue) -> Self {
112        let mut opts = HashMap::new();
113        if let Some(ttl) = value.ttl.map(|ttl| ttl.to_string()) {
114            opts.insert(OPT_KEY_TTL.to_string(), ttl);
115        }
116        opts.extend(
117            value
118                .extra_options
119                .iter()
120                .map(|(k, v)| (k.clone(), v.clone())),
121        );
122        opts
123    }
124}
125
126impl<'a> SchemaNameKey<'a> {
127    pub fn new(catalog: &'a str, schema: &'a str) -> Self {
128        Self { catalog, schema }
129    }
130
131    pub fn range_start_key(catalog: &str) -> String {
132        format!("{}/{}/", SCHEMA_NAME_KEY_PREFIX, catalog)
133    }
134}
135
136impl Display for SchemaNameKey<'_> {
137    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
138        write!(
139            f,
140            "{}/{}/{}",
141            SCHEMA_NAME_KEY_PREFIX, self.catalog, self.schema
142        )
143    }
144}
145
146impl<'a> MetadataKey<'a, SchemaNameKey<'a>> for SchemaNameKey<'_> {
147    fn to_bytes(&self) -> Vec<u8> {
148        self.to_string().into_bytes()
149    }
150
151    fn from_bytes(bytes: &'a [u8]) -> Result<SchemaNameKey<'a>> {
152        let key = std::str::from_utf8(bytes).map_err(|e| {
153            InvalidMetadataSnafu {
154                err_msg: format!(
155                    "SchemaNameKey '{}' is not a valid UTF8 string: {e}",
156                    String::from_utf8_lossy(bytes)
157                ),
158            }
159            .build()
160        })?;
161        SchemaNameKey::try_from(key)
162    }
163}
164
165/// Decodes `KeyValue` to {schema}
166pub fn schema_decoder(kv: KeyValue) -> Result<String> {
167    let str = std::str::from_utf8(&kv.key).context(error::ConvertRawKeySnafu)?;
168    let schema_name = SchemaNameKey::try_from(str)?;
169
170    Ok(schema_name.schema.to_string())
171}
172
173impl<'a> TryFrom<&'a str> for SchemaNameKey<'a> {
174    type Error = Error;
175
176    fn try_from(s: &'a str) -> Result<Self> {
177        let captures = SCHEMA_NAME_KEY_PATTERN
178            .captures(s)
179            .context(InvalidMetadataSnafu {
180                err_msg: format!("Illegal SchemaNameKey format: '{s}'"),
181            })?;
182
183        // Safety: pass the regex check above
184        Ok(Self {
185            catalog: captures.get(1).unwrap().as_str(),
186            schema: captures.get(2).unwrap().as_str(),
187        })
188    }
189}
190
191#[derive(Clone)]
192pub struct SchemaManager {
193    kv_backend: KvBackendRef,
194}
195
196pub type SchemaNameDecodeResult = Result<Option<DeserializedValueWithBytes<SchemaNameValue>>>;
197
198impl SchemaManager {
199    pub fn new(kv_backend: KvBackendRef) -> Self {
200        Self { kv_backend }
201    }
202
203    /// Creates `SchemaNameKey`.
204    pub async fn create(
205        &self,
206        schema: SchemaNameKey<'_>,
207        value: Option<SchemaNameValue>,
208        if_not_exists: bool,
209    ) -> Result<()> {
210        let _timer = crate::metrics::METRIC_META_CREATE_SCHEMA.start_timer();
211
212        let raw_key = schema.to_bytes();
213        let raw_value = value.unwrap_or_default().try_as_raw_value()?;
214        if self
215            .kv_backend
216            .put_conditionally(raw_key, raw_value, if_not_exists)
217            .await?
218        {
219            crate::metrics::METRIC_META_CREATE_SCHEMA_COUNTER.inc();
220        }
221
222        Ok(())
223    }
224
225    pub async fn exists(&self, schema: SchemaNameKey<'_>) -> Result<bool> {
226        let raw_key = schema.to_bytes();
227
228        self.kv_backend.exists(&raw_key).await
229    }
230
231    pub async fn get(
232        &self,
233        schema: SchemaNameKey<'_>,
234    ) -> Result<Option<DeserializedValueWithBytes<SchemaNameValue>>> {
235        let raw_key = schema.to_bytes();
236        self.kv_backend
237            .get(&raw_key)
238            .await?
239            .map(|x| DeserializedValueWithBytes::from_inner_slice(&x.value))
240            .transpose()
241    }
242
243    /// Deletes a [SchemaNameKey].
244    pub async fn delete(&self, schema: SchemaNameKey<'_>) -> Result<()> {
245        let raw_key = schema.to_bytes();
246        self.kv_backend.delete(&raw_key, false).await?;
247
248        Ok(())
249    }
250
251    pub(crate) fn build_update_txn(
252        &self,
253        schema: SchemaNameKey<'_>,
254        current_schema_value: &DeserializedValueWithBytes<SchemaNameValue>,
255        new_schema_value: &SchemaNameValue,
256    ) -> Result<(
257        Txn,
258        impl FnOnce(&mut TxnOpGetResponseSet) -> SchemaNameDecodeResult,
259    )> {
260        let raw_key = schema.to_bytes();
261        let raw_value = current_schema_value.get_raw_bytes();
262        let new_raw_value: Vec<u8> = new_schema_value.try_as_raw_value()?;
263
264        let txn = Txn::compare_and_put(raw_key.clone(), raw_value, new_raw_value);
265
266        Ok((
267            txn,
268            TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(raw_key)),
269        ))
270    }
271
272    /// Updates a [SchemaNameKey].
273    pub async fn update(
274        &self,
275        schema: SchemaNameKey<'_>,
276        current_schema_value: &DeserializedValueWithBytes<SchemaNameValue>,
277        new_schema_value: &SchemaNameValue,
278    ) -> Result<()> {
279        let (txn, on_failure) =
280            self.build_update_txn(schema, current_schema_value, new_schema_value)?;
281        let mut r = self.kv_backend.txn(txn).await?;
282
283        if !r.succeeded {
284            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
285            let remote_schema_value = on_failure(&mut set)?
286                .context(error::UnexpectedSnafu {
287                    err_msg:
288                        "Reads the empty schema name value in comparing operation of updating schema name value",
289                })?
290                .into_inner();
291
292            let op_name = "the updating schema name value";
293            ensure_values!(&remote_schema_value, new_schema_value, op_name);
294        }
295
296        Ok(())
297    }
298
299    /// Returns a schema stream, it lists all schemas belong to the target `catalog`.
300    pub fn schema_names(&self, catalog: &str) -> BoxStream<'static, Result<String>> {
301        let start_key = SchemaNameKey::range_start_key(catalog);
302        let req = RangeRequest::new().with_prefix(start_key.as_bytes());
303
304        let stream = PaginationStream::new(
305            self.kv_backend.clone(),
306            req,
307            DEFAULT_PAGE_SIZE,
308            schema_decoder,
309        )
310        .into_stream();
311
312        Box::pin(stream)
313    }
314}
315
316#[derive(Debug, Clone, Hash, Eq, PartialEq, Deserialize, Serialize)]
317pub struct SchemaName {
318    pub catalog_name: String,
319    pub schema_name: String,
320}
321
322impl<'a> From<&'a SchemaName> for SchemaNameKey<'a> {
323    fn from(value: &'a SchemaName) -> Self {
324        Self {
325            catalog: &value.catalog_name,
326            schema: &value.schema_name,
327        }
328    }
329}
330
331#[cfg(test)]
332mod tests {
333    use std::sync::Arc;
334    use std::time::Duration;
335
336    use super::*;
337    use crate::kv_backend::memory::MemoryKvBackend;
338
339    #[test]
340    fn test_display_schema_value() {
341        let schema_value = SchemaNameValue {
342            ttl: None,
343            ..Default::default()
344        };
345        assert_eq!("", schema_value.to_string());
346
347        let schema_value = SchemaNameValue {
348            ttl: Some(Duration::from_secs(9).into()),
349            ..Default::default()
350        };
351        assert_eq!("'ttl'='9s'\n", schema_value.to_string());
352
353        let schema_value = SchemaNameValue {
354            ttl: Some(Duration::from_secs(0).into()),
355            ..Default::default()
356        };
357        assert_eq!("'ttl'='forever'\n", schema_value.to_string());
358    }
359
360    #[test]
361    fn test_serialization() {
362        let key = SchemaNameKey::new("my-catalog", "my-schema");
363        assert_eq!(key.to_string(), "__schema_name/my-catalog/my-schema");
364
365        let parsed = SchemaNameKey::from_bytes(b"__schema_name/my-catalog/my-schema").unwrap();
366
367        assert_eq!(key, parsed);
368
369        let value = SchemaNameValue {
370            ttl: Some(Duration::from_secs(10).into()),
371            ..Default::default()
372        };
373        let mut opts: HashMap<String, String> = HashMap::new();
374        opts.insert("ttl".to_string(), "10s".to_string());
375        let from_value = SchemaNameValue::try_from(&opts).unwrap();
376        assert_eq!(value, from_value);
377
378        let parsed = SchemaNameValue::try_from_raw_value(
379            serde_json::json!({"ttl": "10s"}).to_string().as_bytes(),
380        )
381        .unwrap();
382        assert_eq!(Some(value), parsed);
383
384        let forever = SchemaNameValue {
385            ttl: Some(Default::default()),
386            ..Default::default()
387        };
388        let parsed = SchemaNameValue::try_from_raw_value(
389            serde_json::json!({"ttl": "forever"}).to_string().as_bytes(),
390        )
391        .unwrap();
392        assert_eq!(Some(forever), parsed);
393
394        let instant_err = SchemaNameValue::try_from_raw_value(
395            serde_json::json!({"ttl": "instant"}).to_string().as_bytes(),
396        );
397        assert!(instant_err.is_err());
398
399        let none = SchemaNameValue::try_from_raw_value("null".as_bytes()).unwrap();
400        assert!(none.is_none());
401
402        let err_empty = SchemaNameValue::try_from_raw_value("".as_bytes());
403        assert!(err_empty.is_err());
404    }
405
406    #[test]
407    fn test_extra_options_compatibility() {
408        // Test with extra_options only
409        let mut opts: HashMap<String, String> = HashMap::new();
410        opts.insert("foo".to_string(), "bar".to_string());
411        opts.insert("baz".to_string(), "qux".to_string());
412        let value = SchemaNameValue::try_from(&opts).unwrap();
413        assert_eq!(value.ttl, None);
414        assert_eq!(value.extra_options.get("foo"), Some(&"bar".to_string()));
415        assert_eq!(value.extra_options.get("baz"), Some(&"qux".to_string()));
416
417        // Test round-trip conversion
418        let opts_back: HashMap<String, String> = value.clone().into();
419        assert_eq!(opts_back.get("foo"), Some(&"bar".to_string()));
420        assert_eq!(opts_back.get("baz"), Some(&"qux".to_string()));
421        assert!(!opts_back.contains_key("ttl"));
422
423        // Test with both ttl and extra_options
424        let mut opts: HashMap<String, String> = HashMap::new();
425        opts.insert("ttl".to_string(), "5m".to_string());
426        opts.insert("opt1".to_string(), "val1".to_string());
427        let value = SchemaNameValue::try_from(&opts).unwrap();
428        assert_eq!(value.ttl, Some(Duration::from_secs(300).into()));
429        assert_eq!(value.extra_options.get("opt1"), Some(&"val1".to_string()));
430
431        // Test serialization/deserialization compatibility
432        let json = serde_json::to_string(&value).unwrap();
433        let deserialized: SchemaNameValue = serde_json::from_str(&json).unwrap();
434        assert_eq!(value, deserialized);
435
436        // Test display includes extra_options
437        let mut value = SchemaNameValue::default();
438        value
439            .extra_options
440            .insert("foo".to_string(), "bar".to_string());
441        let display = value.to_string();
442        assert!(display.contains("'foo'='bar'"));
443    }
444
445    #[test]
446    fn test_backward_compatibility_with_old_format() {
447        // Simulate old format: only ttl, no extra_options
448        let json = r#"{"ttl":"10s"}"#;
449        let parsed = SchemaNameValue::try_from_raw_value(json.as_bytes()).unwrap();
450        assert_eq!(
451            parsed,
452            Some(SchemaNameValue {
453                ttl: Some(Duration::from_secs(10).into()),
454                extra_options: BTreeMap::new(),
455            })
456        );
457
458        // Simulate old format: null value
459        let json = r#"null"#;
460        let parsed = SchemaNameValue::try_from_raw_value(json.as_bytes()).unwrap();
461        assert!(parsed.is_none());
462    }
463
464    #[test]
465    fn test_forward_compatibility_with_new_options() {
466        // Simulate new format: ttl + extra_options
467        let json = r#"{"ttl":"15s","extra_options":{"foo":"bar","baz":"qux"}}"#;
468        let parsed = SchemaNameValue::try_from_raw_value(json.as_bytes()).unwrap();
469        let mut expected_options = BTreeMap::new();
470        expected_options.insert("foo".to_string(), "bar".to_string());
471        expected_options.insert("baz".to_string(), "qux".to_string());
472        assert_eq!(
473            parsed,
474            Some(SchemaNameValue {
475                ttl: Some(Duration::from_secs(15).into()),
476                extra_options: expected_options,
477            })
478        );
479    }
480
481    #[tokio::test]
482    async fn test_key_exist() {
483        let manager = SchemaManager::new(Arc::new(MemoryKvBackend::default()));
484        let schema_key = SchemaNameKey::new("my-catalog", "my-schema");
485        manager.create(schema_key, None, false).await.unwrap();
486
487        assert!(manager.exists(schema_key).await.unwrap());
488
489        let wrong_schema_key = SchemaNameKey::new("my-catalog", "my-wrong");
490
491        assert!(!manager.exists(wrong_schema_key).await.unwrap());
492    }
493
494    #[tokio::test]
495    async fn test_update_schema_value() {
496        let manager = SchemaManager::new(Arc::new(MemoryKvBackend::default()));
497        let schema_key = SchemaNameKey::new("my-catalog", "my-schema");
498        manager.create(schema_key, None, false).await.unwrap();
499
500        let current_schema_value = manager.get(schema_key).await.unwrap().unwrap();
501        let new_schema_value = SchemaNameValue {
502            ttl: Some(Duration::from_secs(10).into()),
503            ..Default::default()
504        };
505        manager
506            .update(schema_key, &current_schema_value, &new_schema_value)
507            .await
508            .unwrap();
509
510        // Update with the same value, should be ok
511        manager
512            .update(schema_key, &current_schema_value, &new_schema_value)
513            .await
514            .unwrap();
515
516        let new_schema_value = SchemaNameValue {
517            ttl: Some(Duration::from_secs(40).into()),
518            ..Default::default()
519        };
520        let incorrect_schema_value = SchemaNameValue {
521            ttl: Some(Duration::from_secs(20).into()),
522            ..Default::default()
523        }
524        .try_as_raw_value()
525        .unwrap();
526        let incorrect_schema_value =
527            DeserializedValueWithBytes::from_inner_slice(&incorrect_schema_value).unwrap();
528
529        manager
530            .update(schema_key, &incorrect_schema_value, &new_schema_value)
531            .await
532            .unwrap_err();
533
534        let current_schema_value = manager.get(schema_key).await.unwrap().unwrap();
535        let new_schema_value = SchemaNameValue {
536            ttl: None,
537            ..Default::default()
538        };
539        manager
540            .update(schema_key, &current_schema_value, &new_schema_value)
541            .await
542            .unwrap();
543
544        let current_schema_value = manager.get(schema_key).await.unwrap().unwrap();
545        assert_eq!(new_schema_value, *current_schema_value);
546    }
547}