1use std::collections::HashMap;
16use std::fmt::Display;
17use std::sync::Arc;
18
19use serde::{Deserialize, Serialize};
20use snafu::OptionExt;
21use table::metadata::{RawTableInfo, TableId};
22use table::table_name::TableName;
23use table::table_reference::TableReference;
24
25use crate::ddl::utils::region_storage_path;
26use crate::error::{InvalidMetadataSnafu, Result};
27use crate::key::txn_helper::TxnOpGetResponseSet;
28use crate::key::{
29 DeserializedValueWithBytes, MetadataKey, MetadataValue, TABLE_INFO_KEY_PATTERN,
30 TABLE_INFO_KEY_PREFIX,
31};
32use crate::kv_backend::txn::Txn;
33use crate::kv_backend::KvBackendRef;
34use crate::rpc::store::BatchGetRequest;
35
36#[derive(Debug, PartialEq)]
40pub struct TableInfoKey {
41 table_id: TableId,
42}
43
44impl TableInfoKey {
45 pub fn new(table_id: TableId) -> Self {
47 Self { table_id }
48 }
49}
50
51impl Display for TableInfoKey {
52 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
53 write!(f, "{}/{}", TABLE_INFO_KEY_PREFIX, self.table_id)
54 }
55}
56
57impl MetadataKey<'_, TableInfoKey> for TableInfoKey {
58 fn to_bytes(&self) -> Vec<u8> {
59 self.to_string().into_bytes()
60 }
61
62 fn from_bytes(bytes: &[u8]) -> Result<TableInfoKey> {
63 let key = std::str::from_utf8(bytes).map_err(|e| {
64 InvalidMetadataSnafu {
65 err_msg: format!(
66 "TableInfoKey '{}' is not a valid UTF8 string: {e}",
67 String::from_utf8_lossy(bytes)
68 ),
69 }
70 .build()
71 })?;
72 let captures = TABLE_INFO_KEY_PATTERN
73 .captures(key)
74 .context(InvalidMetadataSnafu {
75 err_msg: format!("Invalid TableInfoKey '{key}'"),
76 })?;
77 let table_id = captures[1].parse::<TableId>().unwrap();
79 Ok(TableInfoKey { table_id })
80 }
81}
82
83#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
84pub struct TableInfoValue {
85 pub table_info: RawTableInfo,
86 version: u64,
87}
88
89impl TableInfoValue {
90 pub fn new(table_info: RawTableInfo) -> Self {
91 Self {
92 table_info,
93 version: 0,
94 }
95 }
96
97 pub(crate) fn update(&self, new_table_info: RawTableInfo) -> Self {
98 Self {
99 table_info: new_table_info,
100 version: self.version + 1,
101 }
102 }
103
104 pub(crate) fn with_update<F>(&self, update: F) -> Self
105 where
106 F: FnOnce(&mut RawTableInfo),
107 {
108 let mut new_table_info = self.table_info.clone();
109 update(&mut new_table_info);
110 Self {
111 table_info: new_table_info,
112 version: self.version + 1,
113 }
114 }
115
116 pub fn table_ref(&self) -> TableReference {
117 TableReference::full(
118 &self.table_info.catalog_name,
119 &self.table_info.schema_name,
120 &self.table_info.name,
121 )
122 }
123
124 pub fn table_name(&self) -> TableName {
125 TableName {
126 catalog_name: self.table_info.catalog_name.to_string(),
127 schema_name: self.table_info.schema_name.to_string(),
128 table_name: self.table_info.name.to_string(),
129 }
130 }
131
132 pub fn region_storage_path(&self) -> String {
134 region_storage_path(&self.table_info.catalog_name, &self.table_info.schema_name)
135 }
136}
137
138pub type TableInfoManagerRef = Arc<TableInfoManager>;
139
140#[derive(Clone)]
141pub struct TableInfoManager {
142 kv_backend: KvBackendRef,
143}
144pub type TableInfoDecodeResult = Result<Option<DeserializedValueWithBytes<TableInfoValue>>>;
145
146impl TableInfoManager {
147 pub fn new(kv_backend: KvBackendRef) -> Self {
148 Self { kv_backend }
149 }
150
151 pub(crate) fn build_create_txn(
153 &self,
154 table_id: TableId,
155 table_info_value: &TableInfoValue,
156 ) -> Result<(
157 Txn,
158 impl FnOnce(&mut TxnOpGetResponseSet) -> TableInfoDecodeResult,
159 )> {
160 let key = TableInfoKey::new(table_id);
161 let raw_key = key.to_bytes();
162
163 let txn = Txn::put_if_not_exists(raw_key.clone(), table_info_value.try_as_raw_value()?);
164
165 Ok((
166 txn,
167 TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(raw_key)),
168 ))
169 }
170
171 pub(crate) fn build_update_txn(
174 &self,
175 table_id: TableId,
176 current_table_info_value: &DeserializedValueWithBytes<TableInfoValue>,
177 new_table_info_value: &TableInfoValue,
178 ) -> Result<(
179 Txn,
180 impl FnOnce(&mut TxnOpGetResponseSet) -> TableInfoDecodeResult,
181 )> {
182 let key = TableInfoKey::new(table_id);
183 let raw_key = key.to_bytes();
184 let raw_value = current_table_info_value.get_raw_bytes();
185 let new_raw_value: Vec<u8> = new_table_info_value.try_as_raw_value()?;
186
187 let txn = Txn::compare_and_put(raw_key.clone(), raw_value, new_raw_value);
188
189 Ok((
190 txn,
191 TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(raw_key)),
192 ))
193 }
194
195 pub async fn exists(&self, table_id: TableId) -> Result<bool> {
197 let key = TableInfoKey::new(table_id);
198 let raw_key = key.to_bytes();
199 self.kv_backend.exists(&raw_key).await
200 }
201
202 pub async fn get(
203 &self,
204 table_id: TableId,
205 ) -> Result<Option<DeserializedValueWithBytes<TableInfoValue>>> {
206 let key = TableInfoKey::new(table_id);
207 let raw_key = key.to_bytes();
208 self.kv_backend
209 .get(&raw_key)
210 .await?
211 .map(|x| DeserializedValueWithBytes::from_inner_slice(&x.value))
212 .transpose()
213 }
214
215 pub async fn batch_get(
216 &self,
217 table_ids: &[TableId],
218 ) -> Result<HashMap<TableId, TableInfoValue>> {
219 let lookup_table = table_ids
220 .iter()
221 .map(|id| (TableInfoKey::new(*id).to_bytes(), id))
222 .collect::<HashMap<_, _>>();
223
224 let resp = self
225 .kv_backend
226 .batch_get(BatchGetRequest {
227 keys: lookup_table.keys().cloned().collect::<Vec<_>>(),
228 })
229 .await?;
230
231 let values = resp
232 .kvs
233 .iter()
234 .map(|kv| {
235 Ok((
236 **lookup_table.get(kv.key()).unwrap(),
238 TableInfoValue::try_from_raw_value(&kv.value)?,
239 ))
240 })
241 .collect::<Result<HashMap<_, _>>>()?;
242
243 Ok(values)
244 }
245
246 pub async fn batch_get_raw(
248 &self,
249 table_ids: &[TableId],
250 ) -> Result<HashMap<TableId, DeserializedValueWithBytes<TableInfoValue>>> {
251 let lookup_table = table_ids
252 .iter()
253 .map(|id| (TableInfoKey::new(*id).to_bytes(), id))
254 .collect::<HashMap<_, _>>();
255
256 let resp = self
257 .kv_backend
258 .batch_get(BatchGetRequest {
259 keys: lookup_table.keys().cloned().collect::<Vec<_>>(),
260 })
261 .await?;
262
263 let values = resp
264 .kvs
265 .iter()
266 .map(|kv| {
267 Ok((
268 **lookup_table.get(kv.key()).unwrap(),
270 DeserializedValueWithBytes::from_inner_slice(&kv.value)?,
271 ))
272 })
273 .collect::<Result<HashMap<_, _>>>()?;
274
275 Ok(values)
276 }
277}
278
279#[cfg(test)]
280mod tests {
281
282 use datatypes::prelude::ConcreteDataType;
283 use datatypes::schema::{ColumnSchema, RawSchema, Schema};
284 use table::metadata::{RawTableMeta, TableIdent, TableType};
285
286 use super::*;
287
288 #[test]
289 fn test_deserialization_compatibility() {
290 let s = r#"{"version":1,"table_info":{"ident":{"table_id":8714,"version":0},"name":"go_gc_duration_seconds","desc":"Created on insertion","catalog_name":"e87lehzy63d4cloud_docs_test","schema_name":"public","meta":{"schema":{"column_schemas":[{"name":"instance","data_type":{"String":null},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"job","data_type":{"String":null},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"quantile","data_type":{"String":null},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"greptime_timestamp","data_type":{"Timestamp":{"Millisecond":null}},"is_nullable":false,"is_time_index":true,"default_constraint":null,"metadata":{"greptime:time_index":"true"}},{"name":"greptime_value","data_type":{"Float64":{}},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}}],"timestamp_index":3,"version":0},"primary_key_indices":[0,1,2],"value_indices":[],"engine":"mito","next_column_id":5,"region_numbers":[],"engine_options":{},"options":{"write_buffer_size":null,"ttl":null,"extra_options":{}},"created_on":"1970-01-01T00:00:00Z"},"table_type":"Base"}}"#;
291 let v = TableInfoValue::try_from_raw_value(s.as_bytes()).unwrap();
292 assert!(v.table_info.meta.partition_key_indices.is_empty());
293 }
294
295 #[test]
296 fn test_key_serialization() {
297 let key = TableInfoKey::new(42);
298 let raw_key = key.to_bytes();
299 assert_eq!(raw_key, b"__table_info/42");
300 }
301
302 #[test]
303 fn test_key_deserialization() {
304 let expected = TableInfoKey::new(42);
305 let key = TableInfoKey::from_bytes(b"__table_info/42").unwrap();
306 assert_eq!(key, expected);
307 }
308
309 #[test]
310 fn test_value_serialization() {
311 let value = TableInfoValue {
312 table_info: new_table_info(42),
313 version: 1,
314 };
315 let serialized = value.try_as_raw_value().unwrap();
316 let deserialized = TableInfoValue::try_from_raw_value(&serialized).unwrap();
317 assert_eq!(value, deserialized);
318 }
319
320 fn new_table_info(table_id: TableId) -> RawTableInfo {
321 let schema = Schema::new(vec![ColumnSchema::new(
322 "name",
323 ConcreteDataType::string_datatype(),
324 true,
325 )]);
326
327 let meta = RawTableMeta {
328 schema: RawSchema::from(&schema),
329 engine: "mito".to_string(),
330 created_on: chrono::DateTime::default(),
331 primary_key_indices: vec![0, 1],
332 next_column_id: 3,
333 value_indices: vec![2, 3],
334 options: Default::default(),
335 region_numbers: vec![1],
336 partition_key_indices: vec![],
337 };
338
339 RawTableInfo {
340 ident: TableIdent {
341 table_id,
342 version: 1,
343 },
344 name: "table_1".to_string(),
345 desc: Some("blah".to_string()),
346 catalog_name: "catalog_1".to_string(),
347 schema_name: "schema_1".to_string(),
348 meta,
349 table_type: TableType::Base,
350 }
351 }
352}