1use std::collections::HashMap;
16use std::fmt::Display;
17use std::sync::Arc;
18
19use futures_util::stream::BoxStream;
20use serde::{Deserialize, Serialize};
21use snafu::OptionExt;
22use table::metadata::TableId;
23use table::table_name::TableName;
24use table::table_reference::TableReference;
25
26use crate::error::{Error, InvalidMetadataSnafu, Result};
27use crate::key::{MetadataKey, MetadataValue, TABLE_NAME_KEY_PATTERN, TABLE_NAME_KEY_PREFIX};
28use crate::kv_backend::memory::MemoryKvBackend;
29use crate::kv_backend::txn::{Txn, TxnOp};
30use crate::kv_backend::KvBackendRef;
31use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE};
32use crate::rpc::store::{BatchGetRequest, RangeRequest};
33use crate::rpc::KeyValue;
34
35#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
36pub struct TableNameKey<'a> {
37 pub catalog: &'a str,
38 pub schema: &'a str,
39 pub table: &'a str,
40}
41
42impl<'a> TableNameKey<'a> {
43 pub fn new(catalog: &'a str, schema: &'a str, table: &'a str) -> Self {
44 Self {
45 catalog,
46 schema,
47 table,
48 }
49 }
50
51 pub fn prefix_to_table(catalog: &str, schema: &str) -> String {
52 format!("{}/{}/{}/", TABLE_NAME_KEY_PREFIX, catalog, schema)
53 }
54}
55
56impl Display for TableNameKey<'_> {
57 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
58 write!(
59 f,
60 "{}{}",
61 Self::prefix_to_table(self.catalog, self.schema),
62 self.table
63 )
64 }
65}
66
67impl<'a> MetadataKey<'a, TableNameKey<'a>> for TableNameKey<'_> {
68 fn to_bytes(&self) -> Vec<u8> {
69 self.to_string().into_bytes()
70 }
71
72 fn from_bytes(bytes: &'a [u8]) -> Result<TableNameKey<'a>> {
73 let key = std::str::from_utf8(bytes).map_err(|e| {
74 InvalidMetadataSnafu {
75 err_msg: format!(
76 "TableNameKey '{}' is not a valid UTF8 string: {e}",
77 String::from_utf8_lossy(bytes)
78 ),
79 }
80 .build()
81 })?;
82 let captures = TABLE_NAME_KEY_PATTERN
83 .captures(key)
84 .context(InvalidMetadataSnafu {
85 err_msg: format!("Invalid TableNameKey '{key}'"),
86 })?;
87 let catalog = captures.get(1).unwrap().as_str();
88 let schema = captures.get(2).unwrap().as_str();
89 let table = captures.get(3).unwrap().as_str();
90 Ok(TableNameKey {
91 catalog,
92 schema,
93 table,
94 })
95 }
96}
97
98pub fn table_decoder(kv: KeyValue) -> Result<(String, TableNameValue)> {
100 let table_name_key = TableNameKey::from_bytes(&kv.key)?;
101 let table_name_value = TableNameValue::try_from_raw_value(&kv.value)?;
102
103 Ok((table_name_key.table.to_string(), table_name_value))
104}
105
106impl<'a> From<&'a TableName> for TableNameKey<'a> {
107 fn from(value: &'a TableName) -> Self {
108 Self {
109 catalog: &value.catalog_name,
110 schema: &value.schema_name,
111 table: &value.table_name,
112 }
113 }
114}
115
116impl From<TableNameKey<'_>> for TableName {
117 fn from(value: TableNameKey<'_>) -> Self {
118 Self {
119 catalog_name: value.catalog.to_string(),
120 schema_name: value.schema.to_string(),
121 table_name: value.table.to_string(),
122 }
123 }
124}
125
126impl<'a> From<TableNameKey<'a>> for TableReference<'a> {
127 fn from(value: TableNameKey<'a>) -> Self {
128 Self {
129 catalog: value.catalog,
130 schema: value.schema,
131 table: value.table,
132 }
133 }
134}
135
136impl<'a> TryFrom<&'a str> for TableNameKey<'a> {
137 type Error = Error;
138
139 fn try_from(s: &'a str) -> Result<Self> {
140 let captures = TABLE_NAME_KEY_PATTERN
141 .captures(s)
142 .context(InvalidMetadataSnafu {
143 err_msg: format!("Illegal TableNameKey format: '{s}'"),
144 })?;
145 Ok(Self {
147 catalog: captures.get(1).unwrap().as_str(),
148 schema: captures.get(2).unwrap().as_str(),
149 table: captures.get(3).unwrap().as_str(),
150 })
151 }
152}
153
154#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
155pub struct TableNameValue {
156 table_id: TableId,
157}
158
159impl TableNameValue {
160 pub fn new(table_id: TableId) -> Self {
161 Self { table_id }
162 }
163
164 pub fn table_id(&self) -> TableId {
165 self.table_id
166 }
167}
168
169pub type TableNameManagerRef = Arc<TableNameManager>;
170
171#[derive(Clone)]
172pub struct TableNameManager {
173 kv_backend: KvBackendRef,
174}
175
176impl Default for TableNameManager {
177 fn default() -> Self {
178 Self::new(Arc::new(MemoryKvBackend::default()))
179 }
180}
181
182impl TableNameManager {
183 pub fn new(kv_backend: KvBackendRef) -> Self {
184 Self { kv_backend }
185 }
186
187 pub(crate) fn build_create_txn(
189 &self,
190 key: &TableNameKey<'_>,
191 table_id: TableId,
192 ) -> Result<Txn> {
193 let raw_key = key.to_bytes();
194 let value = TableNameValue::new(table_id);
195 let raw_value = value.try_as_raw_value()?;
196
197 let txn = Txn::new().and_then(vec![TxnOp::Put(raw_key, raw_value)]);
198
199 Ok(txn)
200 }
201
202 pub(crate) fn build_update_txn(
204 &self,
205 key: &TableNameKey<'_>,
206 new_key: &TableNameKey<'_>,
207 table_id: TableId,
208 ) -> Result<Txn> {
209 let raw_key = key.to_bytes();
210 let new_raw_key = new_key.to_bytes();
211 let value = TableNameValue::new(table_id);
212 let raw_value = value.try_as_raw_value()?;
213
214 let txn = Txn::new().and_then(vec![
215 TxnOp::Delete(raw_key),
216 TxnOp::Put(new_raw_key, raw_value),
217 ]);
218 Ok(txn)
219 }
220
221 pub async fn get(&self, key: TableNameKey<'_>) -> Result<Option<TableNameValue>> {
222 let raw_key = key.to_bytes();
223 self.kv_backend
224 .get(&raw_key)
225 .await?
226 .map(|x| TableNameValue::try_from_raw_value(&x.value))
227 .transpose()
228 }
229
230 pub async fn batch_get(
231 &self,
232 keys: Vec<TableNameKey<'_>>,
233 ) -> Result<Vec<Option<TableNameValue>>> {
234 let raw_keys = keys
235 .into_iter()
236 .map(|key| key.to_bytes())
237 .collect::<Vec<_>>();
238 let req = BatchGetRequest::new().with_keys(raw_keys.clone());
239 let res = self.kv_backend.batch_get(req).await?;
240 let kvs = res
241 .kvs
242 .into_iter()
243 .map(|kv| (kv.key, kv.value))
244 .collect::<HashMap<_, _>>();
245 let mut array = vec![None; raw_keys.len()];
246 for (i, key) in raw_keys.into_iter().enumerate() {
247 let v = kvs.get(&key);
248 array[i] = v
249 .map(|v| TableNameValue::try_from_raw_value(v))
250 .transpose()?;
251 }
252 Ok(array)
253 }
254
255 pub async fn exists(&self, key: TableNameKey<'_>) -> Result<bool> {
256 let raw_key = key.to_bytes();
257 self.kv_backend.exists(&raw_key).await
258 }
259
260 pub fn tables(
261 &self,
262 catalog: &str,
263 schema: &str,
264 ) -> BoxStream<'static, Result<(String, TableNameValue)>> {
265 let key = TableNameKey::prefix_to_table(catalog, schema).into_bytes();
266 let req = RangeRequest::new().with_prefix(key);
267
268 let stream = PaginationStream::new(
269 self.kv_backend.clone(),
270 req,
271 DEFAULT_PAGE_SIZE,
272 table_decoder,
273 )
274 .into_stream();
275
276 Box::pin(stream)
277 }
278}
279
280#[cfg(test)]
281mod tests {
282
283 use futures::StreamExt;
284
285 use super::*;
286 use crate::kv_backend::KvBackend;
287 use crate::rpc::store::PutRequest;
288
289 #[test]
290 fn test_strip_table_name() {
291 fn test_err(bytes: &[u8]) {
292 assert!(TableNameKey::from_bytes(bytes).is_err());
293 }
294
295 test_err(b"");
296 test_err(vec![0u8, 159, 146, 150].as_slice()); test_err(b"invalid_prefix/my_catalog/my_schema/my_table");
298 test_err(b"__table_name/");
299 test_err(b"__table_name/invalid_len_1");
300 test_err(b"__table_name/invalid_len_2/x");
301 test_err(b"__table_name/invalid_len_4/x/y/z");
302 test_err(b"__table_name/000_invalid_catalog/y/z");
303 test_err(b"__table_name/x/000_invalid_schema/z");
304 test_err(b"__table_name/x/y/000_invalid_table");
305
306 fn test_ok(table_name: &str) {
307 assert_eq!(
308 table_name,
309 TableNameKey::from_bytes(
310 format!("__table_name/my_catalog/my_schema/{}", table_name).as_bytes()
311 )
312 .unwrap()
313 .table
314 );
315 }
316 test_ok("my_table");
317 test_ok("cpu:metrics");
318 test_ok(":cpu:metrics");
319 test_ok("sys.cpu.system");
320 test_ok("foo-bar");
321 }
322
323 #[test]
324 fn test_serialization() {
325 let key = TableNameKey::new("my_catalog", "my_schema", "my_table");
326 let raw_key = key.to_bytes();
327 assert_eq!(
328 b"__table_name/my_catalog/my_schema/my_table",
329 raw_key.as_slice()
330 );
331 let table_name_key =
332 TableNameKey::from_bytes(b"__table_name/my_catalog/my_schema/my_table").unwrap();
333 assert_eq!(table_name_key.catalog, "my_catalog");
334 assert_eq!(table_name_key.schema, "my_schema");
335 assert_eq!(table_name_key.table, "my_table");
336
337 let value = TableNameValue::new(1);
338 let literal = br#"{"table_id":1}"#;
339
340 assert_eq!(value.try_as_raw_value().unwrap(), literal);
341 assert_eq!(TableNameValue::try_from_raw_value(literal).unwrap(), value);
342 }
343
344 #[tokio::test]
345 async fn test_prefix_scan_tables() {
346 let memory_kv = Arc::new(MemoryKvBackend::<crate::error::Error>::new());
347 memory_kv
348 .put(PutRequest {
349 key: TableNameKey {
350 catalog: "greptime",
351 schema: "👉",
352 table: "t",
353 }
354 .to_bytes(),
355 value: vec![],
356 prev_kv: false,
357 })
358 .await
359 .unwrap();
360 memory_kv
361 .put(PutRequest {
362 key: TableNameKey {
363 catalog: "greptime",
364 schema: "👉👈",
365 table: "t",
366 }
367 .to_bytes(),
368 value: vec![],
369 prev_kv: false,
370 })
371 .await
372 .unwrap();
373
374 let manager = TableNameManager::new(memory_kv);
375 let items = manager.tables("greptime", "👉").collect::<Vec<_>>().await;
376 assert_eq!(items.len(), 1);
377 }
378}