common_meta/key/
table_name.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::fmt::Display;
17use std::sync::Arc;
18
19use futures_util::stream::BoxStream;
20use serde::{Deserialize, Serialize};
21use snafu::OptionExt;
22use table::metadata::TableId;
23use table::table_name::TableName;
24use table::table_reference::TableReference;
25
26use crate::error::{Error, InvalidMetadataSnafu, Result};
27use crate::key::{MetadataKey, MetadataValue, TABLE_NAME_KEY_PATTERN, TABLE_NAME_KEY_PREFIX};
28use crate::kv_backend::memory::MemoryKvBackend;
29use crate::kv_backend::txn::{Txn, TxnOp};
30use crate::kv_backend::KvBackendRef;
31use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE};
32use crate::rpc::store::{BatchGetRequest, RangeRequest};
33use crate::rpc::KeyValue;
34
35#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
36pub struct TableNameKey<'a> {
37    pub catalog: &'a str,
38    pub schema: &'a str,
39    pub table: &'a str,
40}
41
42impl<'a> TableNameKey<'a> {
43    pub fn new(catalog: &'a str, schema: &'a str, table: &'a str) -> Self {
44        Self {
45            catalog,
46            schema,
47            table,
48        }
49    }
50
51    pub fn prefix_to_table(catalog: &str, schema: &str) -> String {
52        format!("{}/{}/{}/", TABLE_NAME_KEY_PREFIX, catalog, schema)
53    }
54}
55
56impl Display for TableNameKey<'_> {
57    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
58        write!(
59            f,
60            "{}{}",
61            Self::prefix_to_table(self.catalog, self.schema),
62            self.table
63        )
64    }
65}
66
67impl<'a> MetadataKey<'a, TableNameKey<'a>> for TableNameKey<'_> {
68    fn to_bytes(&self) -> Vec<u8> {
69        self.to_string().into_bytes()
70    }
71
72    fn from_bytes(bytes: &'a [u8]) -> Result<TableNameKey<'a>> {
73        let key = std::str::from_utf8(bytes).map_err(|e| {
74            InvalidMetadataSnafu {
75                err_msg: format!(
76                    "TableNameKey '{}' is not a valid UTF8 string: {e}",
77                    String::from_utf8_lossy(bytes)
78                ),
79            }
80            .build()
81        })?;
82        let captures = TABLE_NAME_KEY_PATTERN
83            .captures(key)
84            .context(InvalidMetadataSnafu {
85                err_msg: format!("Invalid TableNameKey '{key}'"),
86            })?;
87        let catalog = captures.get(1).unwrap().as_str();
88        let schema = captures.get(2).unwrap().as_str();
89        let table = captures.get(3).unwrap().as_str();
90        Ok(TableNameKey {
91            catalog,
92            schema,
93            table,
94        })
95    }
96}
97
98/// Decodes `KeyValue` to ({table_name}, TableNameValue)
99pub fn table_decoder(kv: KeyValue) -> Result<(String, TableNameValue)> {
100    let table_name_key = TableNameKey::from_bytes(&kv.key)?;
101    let table_name_value = TableNameValue::try_from_raw_value(&kv.value)?;
102
103    Ok((table_name_key.table.to_string(), table_name_value))
104}
105
106impl<'a> From<&TableReference<'a>> for TableNameKey<'a> {
107    fn from(value: &TableReference<'a>) -> Self {
108        Self {
109            catalog: value.catalog,
110            schema: value.schema,
111            table: value.table,
112        }
113    }
114}
115
116impl<'a> From<TableReference<'a>> for TableNameKey<'a> {
117    fn from(value: TableReference<'a>) -> Self {
118        Self {
119            catalog: value.catalog,
120            schema: value.schema,
121            table: value.table,
122        }
123    }
124}
125
126impl<'a> From<&'a TableName> for TableNameKey<'a> {
127    fn from(value: &'a TableName) -> Self {
128        Self {
129            catalog: &value.catalog_name,
130            schema: &value.schema_name,
131            table: &value.table_name,
132        }
133    }
134}
135
136impl From<TableNameKey<'_>> for TableName {
137    fn from(value: TableNameKey<'_>) -> Self {
138        Self {
139            catalog_name: value.catalog.to_string(),
140            schema_name: value.schema.to_string(),
141            table_name: value.table.to_string(),
142        }
143    }
144}
145
146impl<'a> From<TableNameKey<'a>> for TableReference<'a> {
147    fn from(value: TableNameKey<'a>) -> Self {
148        Self {
149            catalog: value.catalog,
150            schema: value.schema,
151            table: value.table,
152        }
153    }
154}
155
156impl<'a> TryFrom<&'a str> for TableNameKey<'a> {
157    type Error = Error;
158
159    fn try_from(s: &'a str) -> Result<Self> {
160        let captures = TABLE_NAME_KEY_PATTERN
161            .captures(s)
162            .context(InvalidMetadataSnafu {
163                err_msg: format!("Illegal TableNameKey format: '{s}'"),
164            })?;
165        // Safety: pass the regex check above
166        Ok(Self {
167            catalog: captures.get(1).unwrap().as_str(),
168            schema: captures.get(2).unwrap().as_str(),
169            table: captures.get(3).unwrap().as_str(),
170        })
171    }
172}
173
174#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
175pub struct TableNameValue {
176    table_id: TableId,
177}
178
179impl TableNameValue {
180    pub fn new(table_id: TableId) -> Self {
181        Self { table_id }
182    }
183
184    pub fn table_id(&self) -> TableId {
185        self.table_id
186    }
187}
188
189pub type TableNameManagerRef = Arc<TableNameManager>;
190
191#[derive(Clone)]
192pub struct TableNameManager {
193    kv_backend: KvBackendRef,
194}
195
196impl Default for TableNameManager {
197    fn default() -> Self {
198        Self::new(Arc::new(MemoryKvBackend::default()))
199    }
200}
201
202impl TableNameManager {
203    pub fn new(kv_backend: KvBackendRef) -> Self {
204        Self { kv_backend }
205    }
206
207    /// Builds a create table name transaction. It only executes while the primary keys comparing successes.
208    pub(crate) fn build_create_txn(
209        &self,
210        key: &TableNameKey<'_>,
211        table_id: TableId,
212    ) -> Result<Txn> {
213        let raw_key = key.to_bytes();
214        let value = TableNameValue::new(table_id);
215        let raw_value = value.try_as_raw_value()?;
216
217        let txn = Txn::new().and_then(vec![TxnOp::Put(raw_key, raw_value)]);
218
219        Ok(txn)
220    }
221
222    /// Builds a update table name transaction. It only executes while the primary keys comparing successes.
223    pub(crate) fn build_update_txn(
224        &self,
225        key: &TableNameKey<'_>,
226        new_key: &TableNameKey<'_>,
227        table_id: TableId,
228    ) -> Result<Txn> {
229        let raw_key = key.to_bytes();
230        let new_raw_key = new_key.to_bytes();
231        let value = TableNameValue::new(table_id);
232        let raw_value = value.try_as_raw_value()?;
233
234        let txn = Txn::new().and_then(vec![
235            TxnOp::Delete(raw_key),
236            TxnOp::Put(new_raw_key, raw_value),
237        ]);
238        Ok(txn)
239    }
240
241    pub async fn get(&self, key: TableNameKey<'_>) -> Result<Option<TableNameValue>> {
242        let raw_key = key.to_bytes();
243        self.kv_backend
244            .get(&raw_key)
245            .await?
246            .map(|x| TableNameValue::try_from_raw_value(&x.value))
247            .transpose()
248    }
249
250    pub async fn batch_get(
251        &self,
252        keys: Vec<TableNameKey<'_>>,
253    ) -> Result<Vec<Option<TableNameValue>>> {
254        let raw_keys = keys
255            .into_iter()
256            .map(|key| key.to_bytes())
257            .collect::<Vec<_>>();
258        let req = BatchGetRequest::new().with_keys(raw_keys.clone());
259        let res = self.kv_backend.batch_get(req).await?;
260        let kvs = res
261            .kvs
262            .into_iter()
263            .map(|kv| (kv.key, kv.value))
264            .collect::<HashMap<_, _>>();
265        let mut array = vec![None; raw_keys.len()];
266        for (i, key) in raw_keys.into_iter().enumerate() {
267            let v = kvs.get(&key);
268            array[i] = v
269                .map(|v| TableNameValue::try_from_raw_value(v))
270                .transpose()?;
271        }
272        Ok(array)
273    }
274
275    pub async fn exists(&self, key: TableNameKey<'_>) -> Result<bool> {
276        let raw_key = key.to_bytes();
277        self.kv_backend.exists(&raw_key).await
278    }
279
280    pub fn tables(
281        &self,
282        catalog: &str,
283        schema: &str,
284    ) -> BoxStream<'static, Result<(String, TableNameValue)>> {
285        let key = TableNameKey::prefix_to_table(catalog, schema).into_bytes();
286        let req = RangeRequest::new().with_prefix(key);
287
288        let stream = PaginationStream::new(
289            self.kv_backend.clone(),
290            req,
291            DEFAULT_PAGE_SIZE,
292            table_decoder,
293        )
294        .into_stream();
295
296        Box::pin(stream)
297    }
298}
299
300#[cfg(test)]
301mod tests {
302
303    use futures::StreamExt;
304
305    use super::*;
306    use crate::kv_backend::KvBackend;
307    use crate::rpc::store::PutRequest;
308
309    #[test]
310    fn test_strip_table_name() {
311        fn test_err(bytes: &[u8]) {
312            assert!(TableNameKey::from_bytes(bytes).is_err());
313        }
314
315        test_err(b"");
316        test_err(vec![0u8, 159, 146, 150].as_slice()); // invalid UTF8 string
317        test_err(b"invalid_prefix/my_catalog/my_schema/my_table");
318        test_err(b"__table_name/");
319        test_err(b"__table_name/invalid_len_1");
320        test_err(b"__table_name/invalid_len_2/x");
321        test_err(b"__table_name/invalid_len_4/x/y/z");
322        test_err(b"__table_name/000_invalid_catalog/y/z");
323        test_err(b"__table_name/x/000_invalid_schema/z");
324        test_err(b"__table_name/x/y/000_invalid_table");
325
326        fn test_ok(table_name: &str) {
327            assert_eq!(
328                table_name,
329                TableNameKey::from_bytes(
330                    format!("__table_name/my_catalog/my_schema/{}", table_name).as_bytes()
331                )
332                .unwrap()
333                .table
334            );
335        }
336        test_ok("my_table");
337        test_ok("cpu:metrics");
338        test_ok(":cpu:metrics");
339        test_ok("sys.cpu.system");
340        test_ok("foo-bar");
341    }
342
343    #[test]
344    fn test_serialization() {
345        let key = TableNameKey::new("my_catalog", "my_schema", "my_table");
346        let raw_key = key.to_bytes();
347        assert_eq!(
348            b"__table_name/my_catalog/my_schema/my_table",
349            raw_key.as_slice()
350        );
351        let table_name_key =
352            TableNameKey::from_bytes(b"__table_name/my_catalog/my_schema/my_table").unwrap();
353        assert_eq!(table_name_key.catalog, "my_catalog");
354        assert_eq!(table_name_key.schema, "my_schema");
355        assert_eq!(table_name_key.table, "my_table");
356
357        let value = TableNameValue::new(1);
358        let literal = br#"{"table_id":1}"#;
359
360        assert_eq!(value.try_as_raw_value().unwrap(), literal);
361        assert_eq!(TableNameValue::try_from_raw_value(literal).unwrap(), value);
362    }
363
364    #[tokio::test]
365    async fn test_prefix_scan_tables() {
366        let memory_kv = Arc::new(MemoryKvBackend::<crate::error::Error>::new());
367        memory_kv
368            .put(PutRequest {
369                key: TableNameKey {
370                    catalog: "greptime",
371                    schema: "👉",
372                    table: "t",
373                }
374                .to_bytes(),
375                value: vec![],
376                prev_kv: false,
377            })
378            .await
379            .unwrap();
380        memory_kv
381            .put(PutRequest {
382                key: TableNameKey {
383                    catalog: "greptime",
384                    schema: "👉👈",
385                    table: "t",
386                }
387                .to_bytes(),
388                value: vec![],
389                prev_kv: false,
390            })
391            .await
392            .unwrap();
393
394        let manager = TableNameManager::new(memory_kv);
395        let items = manager.tables("greptime", "👉").collect::<Vec<_>>().await;
396        assert_eq!(items.len(), 1);
397    }
398}