1use std::collections::HashMap;
16use std::fmt::Display;
17
18use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
19use common_time::DatabaseTimeToLive;
20use futures::stream::BoxStream;
21use humantime_serde::re::humantime;
22use serde::{Deserialize, Serialize};
23use snafu::{ensure, OptionExt, ResultExt};
24
25use crate::ensure_values;
26use crate::error::{self, Error, InvalidMetadataSnafu, ParseOptionSnafu, Result};
27use crate::key::txn_helper::TxnOpGetResponseSet;
28use crate::key::{
29 DeserializedValueWithBytes, MetadataKey, SCHEMA_NAME_KEY_PATTERN, SCHEMA_NAME_KEY_PREFIX,
30};
31use crate::kv_backend::txn::Txn;
32use crate::kv_backend::KvBackendRef;
33use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE};
34use crate::rpc::store::RangeRequest;
35use crate::rpc::KeyValue;
36
37const OPT_KEY_TTL: &str = "ttl";
38
39#[derive(Debug, Clone, Copy, PartialEq)]
43pub struct SchemaNameKey<'a> {
44 pub catalog: &'a str,
45 pub schema: &'a str,
46}
47
48impl Default for SchemaNameKey<'_> {
49 fn default() -> Self {
50 Self {
51 catalog: DEFAULT_CATALOG_NAME,
52 schema: DEFAULT_SCHEMA_NAME,
53 }
54 }
55}
56
57#[derive(Debug, Default, Clone, PartialEq, Serialize, Deserialize)]
58pub struct SchemaNameValue {
59 #[serde(default)]
60 pub ttl: Option<DatabaseTimeToLive>,
61}
62
63impl Display for SchemaNameValue {
64 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
65 if let Some(ttl) = self.ttl.map(|i| i.to_string()) {
66 write!(f, "ttl='{}'", ttl)?;
67 }
68
69 Ok(())
70 }
71}
72
73impl TryFrom<&HashMap<String, String>> for SchemaNameValue {
74 type Error = Error;
75
76 fn try_from(value: &HashMap<String, String>) -> std::result::Result<Self, Self::Error> {
77 let ttl = value
78 .get(OPT_KEY_TTL)
79 .map(|ttl_str| {
80 ttl_str.parse::<humantime::Duration>().map_err(|_| {
81 ParseOptionSnafu {
82 key: OPT_KEY_TTL,
83 value: ttl_str.clone(),
84 }
85 .build()
86 })
87 })
88 .transpose()?
89 .map(|ttl| ttl.into());
90 Ok(Self { ttl })
91 }
92}
93
94impl From<SchemaNameValue> for HashMap<String, String> {
95 fn from(value: SchemaNameValue) -> Self {
96 let mut opts = HashMap::new();
97 if let Some(ttl) = value.ttl.map(|ttl| ttl.to_string()) {
98 opts.insert(OPT_KEY_TTL.to_string(), ttl);
99 }
100 opts
101 }
102}
103
104impl<'a> SchemaNameKey<'a> {
105 pub fn new(catalog: &'a str, schema: &'a str) -> Self {
106 Self { catalog, schema }
107 }
108
109 pub fn range_start_key(catalog: &str) -> String {
110 format!("{}/{}/", SCHEMA_NAME_KEY_PREFIX, catalog)
111 }
112}
113
114impl Display for SchemaNameKey<'_> {
115 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
116 write!(
117 f,
118 "{}/{}/{}",
119 SCHEMA_NAME_KEY_PREFIX, self.catalog, self.schema
120 )
121 }
122}
123
124impl<'a> MetadataKey<'a, SchemaNameKey<'a>> for SchemaNameKey<'_> {
125 fn to_bytes(&self) -> Vec<u8> {
126 self.to_string().into_bytes()
127 }
128
129 fn from_bytes(bytes: &'a [u8]) -> Result<SchemaNameKey<'a>> {
130 let key = std::str::from_utf8(bytes).map_err(|e| {
131 InvalidMetadataSnafu {
132 err_msg: format!(
133 "SchemaNameKey '{}' is not a valid UTF8 string: {e}",
134 String::from_utf8_lossy(bytes)
135 ),
136 }
137 .build()
138 })?;
139 SchemaNameKey::try_from(key)
140 }
141}
142
143pub fn schema_decoder(kv: KeyValue) -> Result<String> {
145 let str = std::str::from_utf8(&kv.key).context(error::ConvertRawKeySnafu)?;
146 let schema_name = SchemaNameKey::try_from(str)?;
147
148 Ok(schema_name.schema.to_string())
149}
150
151impl<'a> TryFrom<&'a str> for SchemaNameKey<'a> {
152 type Error = Error;
153
154 fn try_from(s: &'a str) -> Result<Self> {
155 let captures = SCHEMA_NAME_KEY_PATTERN
156 .captures(s)
157 .context(InvalidMetadataSnafu {
158 err_msg: format!("Illegal SchemaNameKey format: '{s}'"),
159 })?;
160
161 Ok(Self {
163 catalog: captures.get(1).unwrap().as_str(),
164 schema: captures.get(2).unwrap().as_str(),
165 })
166 }
167}
168
169#[derive(Clone)]
170pub struct SchemaManager {
171 kv_backend: KvBackendRef,
172}
173
174pub type SchemaNameDecodeResult = Result<Option<DeserializedValueWithBytes<SchemaNameValue>>>;
175
176impl SchemaManager {
177 pub fn new(kv_backend: KvBackendRef) -> Self {
178 Self { kv_backend }
179 }
180
181 pub async fn create(
183 &self,
184 schema: SchemaNameKey<'_>,
185 value: Option<SchemaNameValue>,
186 if_not_exists: bool,
187 ) -> Result<()> {
188 let _timer = crate::metrics::METRIC_META_CREATE_SCHEMA.start_timer();
189
190 let raw_key = schema.to_bytes();
191 let raw_value = value.unwrap_or_default().try_as_raw_value()?;
192 if self
193 .kv_backend
194 .put_conditionally(raw_key, raw_value, if_not_exists)
195 .await?
196 {
197 crate::metrics::METRIC_META_CREATE_SCHEMA_COUNTER.inc();
198 }
199
200 Ok(())
201 }
202
203 pub async fn exists(&self, schema: SchemaNameKey<'_>) -> Result<bool> {
204 let raw_key = schema.to_bytes();
205
206 self.kv_backend.exists(&raw_key).await
207 }
208
209 pub async fn get(
210 &self,
211 schema: SchemaNameKey<'_>,
212 ) -> Result<Option<DeserializedValueWithBytes<SchemaNameValue>>> {
213 let raw_key = schema.to_bytes();
214 self.kv_backend
215 .get(&raw_key)
216 .await?
217 .map(|x| DeserializedValueWithBytes::from_inner_slice(&x.value))
218 .transpose()
219 }
220
221 pub async fn delete(&self, schema: SchemaNameKey<'_>) -> Result<()> {
223 let raw_key = schema.to_bytes();
224 self.kv_backend.delete(&raw_key, false).await?;
225
226 Ok(())
227 }
228
229 pub(crate) fn build_update_txn(
230 &self,
231 schema: SchemaNameKey<'_>,
232 current_schema_value: &DeserializedValueWithBytes<SchemaNameValue>,
233 new_schema_value: &SchemaNameValue,
234 ) -> Result<(
235 Txn,
236 impl FnOnce(&mut TxnOpGetResponseSet) -> SchemaNameDecodeResult,
237 )> {
238 let raw_key = schema.to_bytes();
239 let raw_value = current_schema_value.get_raw_bytes();
240 let new_raw_value: Vec<u8> = new_schema_value.try_as_raw_value()?;
241
242 let txn = Txn::compare_and_put(raw_key.clone(), raw_value, new_raw_value);
243
244 Ok((
245 txn,
246 TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(raw_key)),
247 ))
248 }
249
250 pub async fn update(
252 &self,
253 schema: SchemaNameKey<'_>,
254 current_schema_value: &DeserializedValueWithBytes<SchemaNameValue>,
255 new_schema_value: &SchemaNameValue,
256 ) -> Result<()> {
257 let (txn, on_failure) =
258 self.build_update_txn(schema, current_schema_value, new_schema_value)?;
259 let mut r = self.kv_backend.txn(txn).await?;
260
261 if !r.succeeded {
262 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
263 let remote_schema_value = on_failure(&mut set)?
264 .context(error::UnexpectedSnafu {
265 err_msg:
266 "Reads the empty schema name value in comparing operation of updating schema name value",
267 })?
268 .into_inner();
269
270 let op_name = "the updating schema name value";
271 ensure_values!(&remote_schema_value, new_schema_value, op_name);
272 }
273
274 Ok(())
275 }
276
277 pub fn schema_names(&self, catalog: &str) -> BoxStream<'static, Result<String>> {
279 let start_key = SchemaNameKey::range_start_key(catalog);
280 let req = RangeRequest::new().with_prefix(start_key.as_bytes());
281
282 let stream = PaginationStream::new(
283 self.kv_backend.clone(),
284 req,
285 DEFAULT_PAGE_SIZE,
286 schema_decoder,
287 )
288 .into_stream();
289
290 Box::pin(stream)
291 }
292}
293
294#[derive(Debug, Clone, Hash, Eq, PartialEq, Deserialize, Serialize)]
295pub struct SchemaName {
296 pub catalog_name: String,
297 pub schema_name: String,
298}
299
300impl<'a> From<&'a SchemaName> for SchemaNameKey<'a> {
301 fn from(value: &'a SchemaName) -> Self {
302 Self {
303 catalog: &value.catalog_name,
304 schema: &value.schema_name,
305 }
306 }
307}
308
309#[cfg(test)]
310mod tests {
311 use std::sync::Arc;
312 use std::time::Duration;
313
314 use super::*;
315 use crate::kv_backend::memory::MemoryKvBackend;
316
317 #[test]
318 fn test_display_schema_value() {
319 let schema_value = SchemaNameValue { ttl: None };
320 assert_eq!("", schema_value.to_string());
321
322 let schema_value = SchemaNameValue {
323 ttl: Some(Duration::from_secs(9).into()),
324 };
325 assert_eq!("ttl='9s'", schema_value.to_string());
326
327 let schema_value = SchemaNameValue {
328 ttl: Some(Duration::from_secs(0).into()),
329 };
330 assert_eq!("ttl='forever'", schema_value.to_string());
331 }
332
333 #[test]
334 fn test_serialization() {
335 let key = SchemaNameKey::new("my-catalog", "my-schema");
336 assert_eq!(key.to_string(), "__schema_name/my-catalog/my-schema");
337
338 let parsed = SchemaNameKey::from_bytes(b"__schema_name/my-catalog/my-schema").unwrap();
339
340 assert_eq!(key, parsed);
341
342 let value = SchemaNameValue {
343 ttl: Some(Duration::from_secs(10).into()),
344 };
345 let mut opts: HashMap<String, String> = HashMap::new();
346 opts.insert("ttl".to_string(), "10s".to_string());
347 let from_value = SchemaNameValue::try_from(&opts).unwrap();
348 assert_eq!(value, from_value);
349
350 let parsed = SchemaNameValue::try_from_raw_value(
351 serde_json::json!({"ttl": "10s"}).to_string().as_bytes(),
352 )
353 .unwrap();
354 assert_eq!(Some(value), parsed);
355
356 let forever = SchemaNameValue {
357 ttl: Some(Default::default()),
358 };
359 let parsed = SchemaNameValue::try_from_raw_value(
360 serde_json::json!({"ttl": "forever"}).to_string().as_bytes(),
361 )
362 .unwrap();
363 assert_eq!(Some(forever), parsed);
364
365 let instant_err = SchemaNameValue::try_from_raw_value(
366 serde_json::json!({"ttl": "instant"}).to_string().as_bytes(),
367 );
368 assert!(instant_err.is_err());
369
370 let none = SchemaNameValue::try_from_raw_value("null".as_bytes()).unwrap();
371 assert!(none.is_none());
372
373 let err_empty = SchemaNameValue::try_from_raw_value("".as_bytes());
374 assert!(err_empty.is_err());
375 }
376
377 #[tokio::test]
378 async fn test_key_exist() {
379 let manager = SchemaManager::new(Arc::new(MemoryKvBackend::default()));
380 let schema_key = SchemaNameKey::new("my-catalog", "my-schema");
381 manager.create(schema_key, None, false).await.unwrap();
382
383 assert!(manager.exists(schema_key).await.unwrap());
384
385 let wrong_schema_key = SchemaNameKey::new("my-catalog", "my-wrong");
386
387 assert!(!manager.exists(wrong_schema_key).await.unwrap());
388 }
389
390 #[tokio::test]
391 async fn test_update_schema_value() {
392 let manager = SchemaManager::new(Arc::new(MemoryKvBackend::default()));
393 let schema_key = SchemaNameKey::new("my-catalog", "my-schema");
394 manager.create(schema_key, None, false).await.unwrap();
395
396 let current_schema_value = manager.get(schema_key).await.unwrap().unwrap();
397 let new_schema_value = SchemaNameValue {
398 ttl: Some(Duration::from_secs(10).into()),
399 };
400 manager
401 .update(schema_key, ¤t_schema_value, &new_schema_value)
402 .await
403 .unwrap();
404
405 manager
407 .update(schema_key, ¤t_schema_value, &new_schema_value)
408 .await
409 .unwrap();
410
411 let new_schema_value = SchemaNameValue {
412 ttl: Some(Duration::from_secs(40).into()),
413 };
414 let incorrect_schema_value = SchemaNameValue {
415 ttl: Some(Duration::from_secs(20).into()),
416 }
417 .try_as_raw_value()
418 .unwrap();
419 let incorrect_schema_value =
420 DeserializedValueWithBytes::from_inner_slice(&incorrect_schema_value).unwrap();
421
422 manager
423 .update(schema_key, &incorrect_schema_value, &new_schema_value)
424 .await
425 .unwrap_err();
426
427 let current_schema_value = manager.get(schema_key).await.unwrap().unwrap();
428 let new_schema_value = SchemaNameValue { ttl: None };
429 manager
430 .update(schema_key, ¤t_schema_value, &new_schema_value)
431 .await
432 .unwrap();
433
434 let current_schema_value = manager.get(schema_key).await.unwrap().unwrap();
435 assert_eq!(new_schema_value, *current_schema_value);
436 }
437}