1use std::collections::{BTreeMap, HashMap};
16use std::fmt::Display;
17
18use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
19use common_time::DatabaseTimeToLive;
20use futures::stream::BoxStream;
21use humantime_serde::re::humantime;
22use serde::{Deserialize, Serialize};
23use snafu::{ensure, OptionExt, ResultExt};
24
25use crate::ensure_values;
26use crate::error::{self, Error, InvalidMetadataSnafu, ParseOptionSnafu, Result};
27use crate::key::txn_helper::TxnOpGetResponseSet;
28use crate::key::{
29 DeserializedValueWithBytes, MetadataKey, SCHEMA_NAME_KEY_PATTERN, SCHEMA_NAME_KEY_PREFIX,
30};
31use crate::kv_backend::txn::Txn;
32use crate::kv_backend::KvBackendRef;
33use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE};
34use crate::rpc::store::RangeRequest;
35use crate::rpc::KeyValue;
36
37const OPT_KEY_TTL: &str = "ttl";
38
39#[derive(Debug, Clone, Copy, PartialEq)]
43pub struct SchemaNameKey<'a> {
44 pub catalog: &'a str,
45 pub schema: &'a str,
46}
47
48impl Default for SchemaNameKey<'_> {
49 fn default() -> Self {
50 Self {
51 catalog: DEFAULT_CATALOG_NAME,
52 schema: DEFAULT_SCHEMA_NAME,
53 }
54 }
55}
56
57#[derive(Debug, Default, Clone, PartialEq, Serialize, Deserialize)]
58pub struct SchemaNameValue {
59 #[serde(default)]
60 pub ttl: Option<DatabaseTimeToLive>,
61 #[serde(default)]
62 pub extra_options: BTreeMap<String, String>,
63}
64
65impl Display for SchemaNameValue {
66 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
67 if let Some(ttl) = self.ttl.map(|i| i.to_string()) {
68 writeln!(f, "'ttl'='{}'", ttl)?;
69 }
70 for (k, v) in self.extra_options.iter() {
71 writeln!(f, "'{k}'='{v}'")?;
72 }
73
74 Ok(())
75 }
76}
77
78impl TryFrom<&HashMap<String, String>> for SchemaNameValue {
79 type Error = Error;
80
81 fn try_from(value: &HashMap<String, String>) -> std::result::Result<Self, Self::Error> {
82 let ttl = value
83 .get(OPT_KEY_TTL)
84 .map(|ttl_str| {
85 ttl_str.parse::<humantime::Duration>().map_err(|_| {
86 ParseOptionSnafu {
87 key: OPT_KEY_TTL,
88 value: ttl_str.clone(),
89 }
90 .build()
91 })
92 })
93 .transpose()?
94 .map(|ttl| ttl.into());
95 let extra_options = value
96 .iter()
97 .filter_map(|(k, v)| {
98 if k == OPT_KEY_TTL {
99 None
100 } else {
101 Some((k.clone(), v.clone()))
102 }
103 })
104 .collect();
105
106 Ok(Self { ttl, extra_options })
107 }
108}
109
110impl From<SchemaNameValue> for HashMap<String, String> {
111 fn from(value: SchemaNameValue) -> Self {
112 let mut opts = HashMap::new();
113 if let Some(ttl) = value.ttl.map(|ttl| ttl.to_string()) {
114 opts.insert(OPT_KEY_TTL.to_string(), ttl);
115 }
116 opts.extend(
117 value
118 .extra_options
119 .iter()
120 .map(|(k, v)| (k.clone(), v.clone())),
121 );
122 opts
123 }
124}
125
126impl<'a> SchemaNameKey<'a> {
127 pub fn new(catalog: &'a str, schema: &'a str) -> Self {
128 Self { catalog, schema }
129 }
130
131 pub fn range_start_key(catalog: &str) -> String {
132 format!("{}/{}/", SCHEMA_NAME_KEY_PREFIX, catalog)
133 }
134}
135
136impl Display for SchemaNameKey<'_> {
137 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
138 write!(
139 f,
140 "{}/{}/{}",
141 SCHEMA_NAME_KEY_PREFIX, self.catalog, self.schema
142 )
143 }
144}
145
146impl<'a> MetadataKey<'a, SchemaNameKey<'a>> for SchemaNameKey<'_> {
147 fn to_bytes(&self) -> Vec<u8> {
148 self.to_string().into_bytes()
149 }
150
151 fn from_bytes(bytes: &'a [u8]) -> Result<SchemaNameKey<'a>> {
152 let key = std::str::from_utf8(bytes).map_err(|e| {
153 InvalidMetadataSnafu {
154 err_msg: format!(
155 "SchemaNameKey '{}' is not a valid UTF8 string: {e}",
156 String::from_utf8_lossy(bytes)
157 ),
158 }
159 .build()
160 })?;
161 SchemaNameKey::try_from(key)
162 }
163}
164
165pub fn schema_decoder(kv: KeyValue) -> Result<String> {
167 let str = std::str::from_utf8(&kv.key).context(error::ConvertRawKeySnafu)?;
168 let schema_name = SchemaNameKey::try_from(str)?;
169
170 Ok(schema_name.schema.to_string())
171}
172
173impl<'a> TryFrom<&'a str> for SchemaNameKey<'a> {
174 type Error = Error;
175
176 fn try_from(s: &'a str) -> Result<Self> {
177 let captures = SCHEMA_NAME_KEY_PATTERN
178 .captures(s)
179 .context(InvalidMetadataSnafu {
180 err_msg: format!("Illegal SchemaNameKey format: '{s}'"),
181 })?;
182
183 Ok(Self {
185 catalog: captures.get(1).unwrap().as_str(),
186 schema: captures.get(2).unwrap().as_str(),
187 })
188 }
189}
190
191#[derive(Clone)]
192pub struct SchemaManager {
193 kv_backend: KvBackendRef,
194}
195
196pub type SchemaNameDecodeResult = Result<Option<DeserializedValueWithBytes<SchemaNameValue>>>;
197
198impl SchemaManager {
199 pub fn new(kv_backend: KvBackendRef) -> Self {
200 Self { kv_backend }
201 }
202
203 pub async fn create(
205 &self,
206 schema: SchemaNameKey<'_>,
207 value: Option<SchemaNameValue>,
208 if_not_exists: bool,
209 ) -> Result<()> {
210 let _timer = crate::metrics::METRIC_META_CREATE_SCHEMA.start_timer();
211
212 let raw_key = schema.to_bytes();
213 let raw_value = value.unwrap_or_default().try_as_raw_value()?;
214 if self
215 .kv_backend
216 .put_conditionally(raw_key, raw_value, if_not_exists)
217 .await?
218 {
219 crate::metrics::METRIC_META_CREATE_SCHEMA_COUNTER.inc();
220 }
221
222 Ok(())
223 }
224
225 pub async fn exists(&self, schema: SchemaNameKey<'_>) -> Result<bool> {
226 let raw_key = schema.to_bytes();
227
228 self.kv_backend.exists(&raw_key).await
229 }
230
231 pub async fn get(
232 &self,
233 schema: SchemaNameKey<'_>,
234 ) -> Result<Option<DeserializedValueWithBytes<SchemaNameValue>>> {
235 let raw_key = schema.to_bytes();
236 self.kv_backend
237 .get(&raw_key)
238 .await?
239 .map(|x| DeserializedValueWithBytes::from_inner_slice(&x.value))
240 .transpose()
241 }
242
243 pub async fn delete(&self, schema: SchemaNameKey<'_>) -> Result<()> {
245 let raw_key = schema.to_bytes();
246 self.kv_backend.delete(&raw_key, false).await?;
247
248 Ok(())
249 }
250
251 pub(crate) fn build_update_txn(
252 &self,
253 schema: SchemaNameKey<'_>,
254 current_schema_value: &DeserializedValueWithBytes<SchemaNameValue>,
255 new_schema_value: &SchemaNameValue,
256 ) -> Result<(
257 Txn,
258 impl FnOnce(&mut TxnOpGetResponseSet) -> SchemaNameDecodeResult,
259 )> {
260 let raw_key = schema.to_bytes();
261 let raw_value = current_schema_value.get_raw_bytes();
262 let new_raw_value: Vec<u8> = new_schema_value.try_as_raw_value()?;
263
264 let txn = Txn::compare_and_put(raw_key.clone(), raw_value, new_raw_value);
265
266 Ok((
267 txn,
268 TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(raw_key)),
269 ))
270 }
271
272 pub async fn update(
274 &self,
275 schema: SchemaNameKey<'_>,
276 current_schema_value: &DeserializedValueWithBytes<SchemaNameValue>,
277 new_schema_value: &SchemaNameValue,
278 ) -> Result<()> {
279 let (txn, on_failure) =
280 self.build_update_txn(schema, current_schema_value, new_schema_value)?;
281 let mut r = self.kv_backend.txn(txn).await?;
282
283 if !r.succeeded {
284 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
285 let remote_schema_value = on_failure(&mut set)?
286 .context(error::UnexpectedSnafu {
287 err_msg:
288 "Reads the empty schema name value in comparing operation of updating schema name value",
289 })?
290 .into_inner();
291
292 let op_name = "the updating schema name value";
293 ensure_values!(&remote_schema_value, new_schema_value, op_name);
294 }
295
296 Ok(())
297 }
298
299 pub fn schema_names(&self, catalog: &str) -> BoxStream<'static, Result<String>> {
301 let start_key = SchemaNameKey::range_start_key(catalog);
302 let req = RangeRequest::new().with_prefix(start_key.as_bytes());
303
304 let stream = PaginationStream::new(
305 self.kv_backend.clone(),
306 req,
307 DEFAULT_PAGE_SIZE,
308 schema_decoder,
309 )
310 .into_stream();
311
312 Box::pin(stream)
313 }
314}
315
316#[derive(Debug, Clone, Hash, Eq, PartialEq, Deserialize, Serialize)]
317pub struct SchemaName {
318 pub catalog_name: String,
319 pub schema_name: String,
320}
321
322impl<'a> From<&'a SchemaName> for SchemaNameKey<'a> {
323 fn from(value: &'a SchemaName) -> Self {
324 Self {
325 catalog: &value.catalog_name,
326 schema: &value.schema_name,
327 }
328 }
329}
330
331#[cfg(test)]
332mod tests {
333 use std::sync::Arc;
334 use std::time::Duration;
335
336 use super::*;
337 use crate::kv_backend::memory::MemoryKvBackend;
338
339 #[test]
340 fn test_display_schema_value() {
341 let schema_value = SchemaNameValue {
342 ttl: None,
343 ..Default::default()
344 };
345 assert_eq!("", schema_value.to_string());
346
347 let schema_value = SchemaNameValue {
348 ttl: Some(Duration::from_secs(9).into()),
349 ..Default::default()
350 };
351 assert_eq!("'ttl'='9s'\n", schema_value.to_string());
352
353 let schema_value = SchemaNameValue {
354 ttl: Some(Duration::from_secs(0).into()),
355 ..Default::default()
356 };
357 assert_eq!("'ttl'='forever'\n", schema_value.to_string());
358 }
359
360 #[test]
361 fn test_serialization() {
362 let key = SchemaNameKey::new("my-catalog", "my-schema");
363 assert_eq!(key.to_string(), "__schema_name/my-catalog/my-schema");
364
365 let parsed = SchemaNameKey::from_bytes(b"__schema_name/my-catalog/my-schema").unwrap();
366
367 assert_eq!(key, parsed);
368
369 let value = SchemaNameValue {
370 ttl: Some(Duration::from_secs(10).into()),
371 ..Default::default()
372 };
373 let mut opts: HashMap<String, String> = HashMap::new();
374 opts.insert("ttl".to_string(), "10s".to_string());
375 let from_value = SchemaNameValue::try_from(&opts).unwrap();
376 assert_eq!(value, from_value);
377
378 let parsed = SchemaNameValue::try_from_raw_value(
379 serde_json::json!({"ttl": "10s"}).to_string().as_bytes(),
380 )
381 .unwrap();
382 assert_eq!(Some(value), parsed);
383
384 let forever = SchemaNameValue {
385 ttl: Some(Default::default()),
386 ..Default::default()
387 };
388 let parsed = SchemaNameValue::try_from_raw_value(
389 serde_json::json!({"ttl": "forever"}).to_string().as_bytes(),
390 )
391 .unwrap();
392 assert_eq!(Some(forever), parsed);
393
394 let instant_err = SchemaNameValue::try_from_raw_value(
395 serde_json::json!({"ttl": "instant"}).to_string().as_bytes(),
396 );
397 assert!(instant_err.is_err());
398
399 let none = SchemaNameValue::try_from_raw_value("null".as_bytes()).unwrap();
400 assert!(none.is_none());
401
402 let err_empty = SchemaNameValue::try_from_raw_value("".as_bytes());
403 assert!(err_empty.is_err());
404 }
405
406 #[test]
407 fn test_extra_options_compatibility() {
408 let mut opts: HashMap<String, String> = HashMap::new();
410 opts.insert("foo".to_string(), "bar".to_string());
411 opts.insert("baz".to_string(), "qux".to_string());
412 let value = SchemaNameValue::try_from(&opts).unwrap();
413 assert_eq!(value.ttl, None);
414 assert_eq!(value.extra_options.get("foo"), Some(&"bar".to_string()));
415 assert_eq!(value.extra_options.get("baz"), Some(&"qux".to_string()));
416
417 let opts_back: HashMap<String, String> = value.clone().into();
419 assert_eq!(opts_back.get("foo"), Some(&"bar".to_string()));
420 assert_eq!(opts_back.get("baz"), Some(&"qux".to_string()));
421 assert!(!opts_back.contains_key("ttl"));
422
423 let mut opts: HashMap<String, String> = HashMap::new();
425 opts.insert("ttl".to_string(), "5m".to_string());
426 opts.insert("opt1".to_string(), "val1".to_string());
427 let value = SchemaNameValue::try_from(&opts).unwrap();
428 assert_eq!(value.ttl, Some(Duration::from_secs(300).into()));
429 assert_eq!(value.extra_options.get("opt1"), Some(&"val1".to_string()));
430
431 let json = serde_json::to_string(&value).unwrap();
433 let deserialized: SchemaNameValue = serde_json::from_str(&json).unwrap();
434 assert_eq!(value, deserialized);
435
436 let mut value = SchemaNameValue::default();
438 value
439 .extra_options
440 .insert("foo".to_string(), "bar".to_string());
441 let display = value.to_string();
442 assert!(display.contains("'foo'='bar'"));
443 }
444
445 #[test]
446 fn test_backward_compatibility_with_old_format() {
447 let json = r#"{"ttl":"10s"}"#;
449 let parsed = SchemaNameValue::try_from_raw_value(json.as_bytes()).unwrap();
450 assert_eq!(
451 parsed,
452 Some(SchemaNameValue {
453 ttl: Some(Duration::from_secs(10).into()),
454 extra_options: BTreeMap::new(),
455 })
456 );
457
458 let json = r#"null"#;
460 let parsed = SchemaNameValue::try_from_raw_value(json.as_bytes()).unwrap();
461 assert!(parsed.is_none());
462 }
463
464 #[test]
465 fn test_forward_compatibility_with_new_options() {
466 let json = r#"{"ttl":"15s","extra_options":{"foo":"bar","baz":"qux"}}"#;
468 let parsed = SchemaNameValue::try_from_raw_value(json.as_bytes()).unwrap();
469 let mut expected_options = BTreeMap::new();
470 expected_options.insert("foo".to_string(), "bar".to_string());
471 expected_options.insert("baz".to_string(), "qux".to_string());
472 assert_eq!(
473 parsed,
474 Some(SchemaNameValue {
475 ttl: Some(Duration::from_secs(15).into()),
476 extra_options: expected_options,
477 })
478 );
479 }
480
481 #[tokio::test]
482 async fn test_key_exist() {
483 let manager = SchemaManager::new(Arc::new(MemoryKvBackend::default()));
484 let schema_key = SchemaNameKey::new("my-catalog", "my-schema");
485 manager.create(schema_key, None, false).await.unwrap();
486
487 assert!(manager.exists(schema_key).await.unwrap());
488
489 let wrong_schema_key = SchemaNameKey::new("my-catalog", "my-wrong");
490
491 assert!(!manager.exists(wrong_schema_key).await.unwrap());
492 }
493
494 #[tokio::test]
495 async fn test_update_schema_value() {
496 let manager = SchemaManager::new(Arc::new(MemoryKvBackend::default()));
497 let schema_key = SchemaNameKey::new("my-catalog", "my-schema");
498 manager.create(schema_key, None, false).await.unwrap();
499
500 let current_schema_value = manager.get(schema_key).await.unwrap().unwrap();
501 let new_schema_value = SchemaNameValue {
502 ttl: Some(Duration::from_secs(10).into()),
503 ..Default::default()
504 };
505 manager
506 .update(schema_key, ¤t_schema_value, &new_schema_value)
507 .await
508 .unwrap();
509
510 manager
512 .update(schema_key, ¤t_schema_value, &new_schema_value)
513 .await
514 .unwrap();
515
516 let new_schema_value = SchemaNameValue {
517 ttl: Some(Duration::from_secs(40).into()),
518 ..Default::default()
519 };
520 let incorrect_schema_value = SchemaNameValue {
521 ttl: Some(Duration::from_secs(20).into()),
522 ..Default::default()
523 }
524 .try_as_raw_value()
525 .unwrap();
526 let incorrect_schema_value =
527 DeserializedValueWithBytes::from_inner_slice(&incorrect_schema_value).unwrap();
528
529 manager
530 .update(schema_key, &incorrect_schema_value, &new_schema_value)
531 .await
532 .unwrap_err();
533
534 let current_schema_value = manager.get(schema_key).await.unwrap().unwrap();
535 let new_schema_value = SchemaNameValue {
536 ttl: None,
537 ..Default::default()
538 };
539 manager
540 .update(schema_key, ¤t_schema_value, &new_schema_value)
541 .await
542 .unwrap();
543
544 let current_schema_value = manager.get(schema_key).await.unwrap().unwrap();
545 assert_eq!(new_schema_value, *current_schema_value);
546 }
547}