common_meta/key/
table_info.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::fmt::Display;
17use std::sync::Arc;
18
19use serde::{Deserialize, Serialize};
20use snafu::OptionExt;
21use table::metadata::{RawTableInfo, TableId};
22use table::table_name::TableName;
23use table::table_reference::TableReference;
24
25use crate::ddl::utils::region_storage_path;
26use crate::error::{InvalidMetadataSnafu, Result};
27use crate::key::txn_helper::TxnOpGetResponseSet;
28use crate::key::{
29    DeserializedValueWithBytes, MetadataKey, MetadataValue, TABLE_INFO_KEY_PATTERN,
30    TABLE_INFO_KEY_PREFIX,
31};
32use crate::kv_backend::txn::Txn;
33use crate::kv_backend::KvBackendRef;
34use crate::rpc::store::BatchGetRequest;
35
36/// The key stores the metadata of the table.
37///
38/// The layout: `__table_info/{table_id}`.
39#[derive(Debug, PartialEq)]
40pub struct TableInfoKey {
41    table_id: TableId,
42}
43
44impl TableInfoKey {
45    /// Returns a new [TableInfoKey].
46    pub fn new(table_id: TableId) -> Self {
47        Self { table_id }
48    }
49}
50
51impl Display for TableInfoKey {
52    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
53        write!(f, "{}/{}", TABLE_INFO_KEY_PREFIX, self.table_id)
54    }
55}
56
57impl MetadataKey<'_, TableInfoKey> for TableInfoKey {
58    fn to_bytes(&self) -> Vec<u8> {
59        self.to_string().into_bytes()
60    }
61
62    fn from_bytes(bytes: &[u8]) -> Result<TableInfoKey> {
63        let key = std::str::from_utf8(bytes).map_err(|e| {
64            InvalidMetadataSnafu {
65                err_msg: format!(
66                    "TableInfoKey '{}' is not a valid UTF8 string: {e}",
67                    String::from_utf8_lossy(bytes)
68                ),
69            }
70            .build()
71        })?;
72        let captures = TABLE_INFO_KEY_PATTERN
73            .captures(key)
74            .context(InvalidMetadataSnafu {
75                err_msg: format!("Invalid TableInfoKey '{key}'"),
76            })?;
77        // Safety: pass the regex check above
78        let table_id = captures[1].parse::<TableId>().unwrap();
79        Ok(TableInfoKey { table_id })
80    }
81}
82
83#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
84pub struct TableInfoValue {
85    pub table_info: RawTableInfo,
86    version: u64,
87}
88
89impl TableInfoValue {
90    pub fn new(table_info: RawTableInfo) -> Self {
91        Self {
92            table_info,
93            version: 0,
94        }
95    }
96
97    pub(crate) fn update(&self, new_table_info: RawTableInfo) -> Self {
98        Self {
99            table_info: new_table_info,
100            version: self.version + 1,
101        }
102    }
103
104    pub(crate) fn with_update<F>(&self, update: F) -> Self
105    where
106        F: FnOnce(&mut RawTableInfo),
107    {
108        let mut new_table_info = self.table_info.clone();
109        update(&mut new_table_info);
110        Self {
111            table_info: new_table_info,
112            version: self.version + 1,
113        }
114    }
115
116    pub fn table_ref(&self) -> TableReference {
117        TableReference::full(
118            &self.table_info.catalog_name,
119            &self.table_info.schema_name,
120            &self.table_info.name,
121        )
122    }
123
124    pub fn table_name(&self) -> TableName {
125        TableName {
126            catalog_name: self.table_info.catalog_name.to_string(),
127            schema_name: self.table_info.schema_name.to_string(),
128            table_name: self.table_info.name.to_string(),
129        }
130    }
131
132    /// Builds storage path for all regions in table.
133    pub fn region_storage_path(&self) -> String {
134        region_storage_path(&self.table_info.catalog_name, &self.table_info.schema_name)
135    }
136}
137
138pub type TableInfoManagerRef = Arc<TableInfoManager>;
139
140#[derive(Clone)]
141pub struct TableInfoManager {
142    kv_backend: KvBackendRef,
143}
144pub type TableInfoDecodeResult = Result<Option<DeserializedValueWithBytes<TableInfoValue>>>;
145
146impl TableInfoManager {
147    pub fn new(kv_backend: KvBackendRef) -> Self {
148        Self { kv_backend }
149    }
150
151    /// Builds a create table info transaction, it expected the `__table_info/{table_id}` wasn't occupied.
152    pub(crate) fn build_create_txn(
153        &self,
154        table_id: TableId,
155        table_info_value: &TableInfoValue,
156    ) -> Result<(
157        Txn,
158        impl FnOnce(&mut TxnOpGetResponseSet) -> TableInfoDecodeResult,
159    )> {
160        let key = TableInfoKey::new(table_id);
161        let raw_key = key.to_bytes();
162
163        let txn = Txn::put_if_not_exists(raw_key.clone(), table_info_value.try_as_raw_value()?);
164
165        Ok((
166            txn,
167            TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(raw_key)),
168        ))
169    }
170
171    /// Builds a update table info transaction, it expected the remote value equals the `current_current_table_info_value`.
172    /// It retrieves the latest value if the comparing failed.
173    pub(crate) fn build_update_txn(
174        &self,
175        table_id: TableId,
176        current_table_info_value: &DeserializedValueWithBytes<TableInfoValue>,
177        new_table_info_value: &TableInfoValue,
178    ) -> Result<(
179        Txn,
180        impl FnOnce(&mut TxnOpGetResponseSet) -> TableInfoDecodeResult,
181    )> {
182        let key = TableInfoKey::new(table_id);
183        let raw_key = key.to_bytes();
184        let raw_value = current_table_info_value.get_raw_bytes();
185        let new_raw_value: Vec<u8> = new_table_info_value.try_as_raw_value()?;
186
187        let txn = Txn::compare_and_put(raw_key.clone(), raw_value, new_raw_value);
188
189        Ok((
190            txn,
191            TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(raw_key)),
192        ))
193    }
194
195    /// Checks if the table exists.
196    pub async fn exists(&self, table_id: TableId) -> Result<bool> {
197        let key = TableInfoKey::new(table_id);
198        let raw_key = key.to_bytes();
199        self.kv_backend.exists(&raw_key).await
200    }
201
202    pub async fn get(
203        &self,
204        table_id: TableId,
205    ) -> Result<Option<DeserializedValueWithBytes<TableInfoValue>>> {
206        let key = TableInfoKey::new(table_id);
207        let raw_key = key.to_bytes();
208        self.kv_backend
209            .get(&raw_key)
210            .await?
211            .map(|x| DeserializedValueWithBytes::from_inner_slice(&x.value))
212            .transpose()
213    }
214
215    pub async fn batch_get(
216        &self,
217        table_ids: &[TableId],
218    ) -> Result<HashMap<TableId, TableInfoValue>> {
219        let lookup_table = table_ids
220            .iter()
221            .map(|id| (TableInfoKey::new(*id).to_bytes(), id))
222            .collect::<HashMap<_, _>>();
223
224        let resp = self
225            .kv_backend
226            .batch_get(BatchGetRequest {
227                keys: lookup_table.keys().cloned().collect::<Vec<_>>(),
228            })
229            .await?;
230
231        let values = resp
232            .kvs
233            .iter()
234            .map(|kv| {
235                Ok((
236                    // Safety: must exist.
237                    **lookup_table.get(kv.key()).unwrap(),
238                    TableInfoValue::try_from_raw_value(&kv.value)?,
239                ))
240            })
241            .collect::<Result<HashMap<_, _>>>()?;
242
243        Ok(values)
244    }
245
246    /// Returns batch of `DeserializedValueWithBytes<TableInfoValue>`.
247    pub async fn batch_get_raw(
248        &self,
249        table_ids: &[TableId],
250    ) -> Result<HashMap<TableId, DeserializedValueWithBytes<TableInfoValue>>> {
251        let lookup_table = table_ids
252            .iter()
253            .map(|id| (TableInfoKey::new(*id).to_bytes(), id))
254            .collect::<HashMap<_, _>>();
255
256        let resp = self
257            .kv_backend
258            .batch_get(BatchGetRequest {
259                keys: lookup_table.keys().cloned().collect::<Vec<_>>(),
260            })
261            .await?;
262
263        let values = resp
264            .kvs
265            .iter()
266            .map(|kv| {
267                Ok((
268                    // Safety: must exist.
269                    **lookup_table.get(kv.key()).unwrap(),
270                    DeserializedValueWithBytes::from_inner_slice(&kv.value)?,
271                ))
272            })
273            .collect::<Result<HashMap<_, _>>>()?;
274
275        Ok(values)
276    }
277}
278
279#[cfg(test)]
280mod tests {
281
282    use datatypes::prelude::ConcreteDataType;
283    use datatypes::schema::{ColumnSchema, RawSchema, Schema};
284    use table::metadata::{RawTableMeta, TableIdent, TableType};
285
286    use super::*;
287
288    #[test]
289    fn test_deserialization_compatibility() {
290        let s = r#"{"version":1,"table_info":{"ident":{"table_id":8714,"version":0},"name":"go_gc_duration_seconds","desc":"Created on insertion","catalog_name":"e87lehzy63d4cloud_docs_test","schema_name":"public","meta":{"schema":{"column_schemas":[{"name":"instance","data_type":{"String":null},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"job","data_type":{"String":null},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"quantile","data_type":{"String":null},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"greptime_timestamp","data_type":{"Timestamp":{"Millisecond":null}},"is_nullable":false,"is_time_index":true,"default_constraint":null,"metadata":{"greptime:time_index":"true"}},{"name":"greptime_value","data_type":{"Float64":{}},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}}],"timestamp_index":3,"version":0},"primary_key_indices":[0,1,2],"value_indices":[],"engine":"mito","next_column_id":5,"region_numbers":[],"engine_options":{},"options":{"write_buffer_size":null,"ttl":null,"extra_options":{}},"created_on":"1970-01-01T00:00:00Z"},"table_type":"Base"}}"#;
291        let v = TableInfoValue::try_from_raw_value(s.as_bytes()).unwrap();
292        assert!(v.table_info.meta.partition_key_indices.is_empty());
293    }
294
295    #[test]
296    fn test_key_serialization() {
297        let key = TableInfoKey::new(42);
298        let raw_key = key.to_bytes();
299        assert_eq!(raw_key, b"__table_info/42");
300    }
301
302    #[test]
303    fn test_key_deserialization() {
304        let expected = TableInfoKey::new(42);
305        let key = TableInfoKey::from_bytes(b"__table_info/42").unwrap();
306        assert_eq!(key, expected);
307    }
308
309    #[test]
310    fn test_value_serialization() {
311        let value = TableInfoValue {
312            table_info: new_table_info(42),
313            version: 1,
314        };
315        let serialized = value.try_as_raw_value().unwrap();
316        let deserialized = TableInfoValue::try_from_raw_value(&serialized).unwrap();
317        assert_eq!(value, deserialized);
318    }
319
320    fn new_table_info(table_id: TableId) -> RawTableInfo {
321        let schema = Schema::new(vec![ColumnSchema::new(
322            "name",
323            ConcreteDataType::string_datatype(),
324            true,
325        )]);
326
327        let meta = RawTableMeta {
328            schema: RawSchema::from(&schema),
329            engine: "mito".to_string(),
330            created_on: chrono::DateTime::default(),
331            primary_key_indices: vec![0, 1],
332            next_column_id: 3,
333            value_indices: vec![2, 3],
334            options: Default::default(),
335            region_numbers: vec![1],
336            partition_key_indices: vec![],
337        };
338
339        RawTableInfo {
340            ident: TableIdent {
341                table_id,
342                version: 1,
343            },
344            name: "table_1".to_string(),
345            desc: Some("blah".to_string()),
346            catalog_name: "catalog_1".to_string(),
347            schema_name: "schema_1".to_string(),
348            meta,
349            table_type: TableType::Base,
350        }
351    }
352}