1use std::collections::{HashMap, HashSet};
16use std::fmt::Display;
17use std::sync::Arc;
18
19use common_base::bytes::Bytes;
20use serde::{Deserialize, Serialize};
21use snafu::OptionExt;
22use table::metadata::TableId;
23use table::table_name::TableName;
24
25use crate::error::{InvalidViewInfoSnafu, Result};
26use crate::key::txn_helper::TxnOpGetResponseSet;
27use crate::key::{
28 DeserializedValueWithBytes, MetadataKey, MetadataValue, VIEW_INFO_KEY_PATTERN,
29 VIEW_INFO_KEY_PREFIX,
30};
31use crate::kv_backend::KvBackendRef;
32use crate::kv_backend::txn::Txn;
33use crate::rpc::store::BatchGetRequest;
34
35#[derive(Debug, PartialEq)]
39pub struct ViewInfoKey {
40 view_id: TableId,
41}
42
43impl ViewInfoKey {
44 pub fn new(view_id: TableId) -> Self {
46 Self { view_id }
47 }
48}
49
50impl Display for ViewInfoKey {
51 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
52 write!(f, "{}/{}", VIEW_INFO_KEY_PREFIX, self.view_id)
53 }
54}
55
56impl MetadataKey<'_, ViewInfoKey> for ViewInfoKey {
57 fn to_bytes(&self) -> Vec<u8> {
58 self.to_string().into_bytes()
59 }
60
61 fn from_bytes(bytes: &[u8]) -> Result<ViewInfoKey> {
62 let key = std::str::from_utf8(bytes).map_err(|e| {
63 InvalidViewInfoSnafu {
64 err_msg: format!(
65 "ViewInfoKey '{}' is not a valid UTF8 string: {e}",
66 String::from_utf8_lossy(bytes)
67 ),
68 }
69 .build()
70 })?;
71 let captures =
72 VIEW_INFO_KEY_PATTERN
73 .captures(key)
74 .with_context(|| InvalidViewInfoSnafu {
75 err_msg: format!("Invalid ViewInfoKey '{key}'"),
76 })?;
77 let view_id = captures[1].parse::<TableId>().unwrap();
79 Ok(ViewInfoKey { view_id })
80 }
81}
82
83#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
85pub struct ViewInfoValue {
86 pub view_info: Bytes,
88 pub table_names: HashSet<TableName>,
90 pub columns: Vec<String>,
92 pub plan_columns: Vec<String>,
94 pub definition: String,
96 version: u64,
97}
98
99impl ViewInfoValue {
100 pub fn new(
101 view_info: Bytes,
102 table_names: HashSet<TableName>,
103 columns: Vec<String>,
104 plan_columns: Vec<String>,
105 definition: String,
106 ) -> Self {
107 Self {
108 view_info,
109 table_names,
110 columns,
111 plan_columns,
112 definition,
113 version: 0,
114 }
115 }
116
117 pub(crate) fn update(
118 &self,
119 new_view_info: Bytes,
120 table_names: HashSet<TableName>,
121 columns: Vec<String>,
122 plan_columns: Vec<String>,
123 definition: String,
124 ) -> Self {
125 Self {
126 view_info: new_view_info,
127 table_names,
128 columns,
129 plan_columns,
130 definition,
131 version: self.version + 1,
132 }
133 }
134}
135
136pub struct ViewInfoManager {
138 kv_backend: KvBackendRef,
139}
140
141pub type ViewInfoManagerRef = Arc<ViewInfoManager>;
142
143pub type ViewInfoValueDecodeResult = Result<Option<DeserializedValueWithBytes<ViewInfoValue>>>;
144
145impl ViewInfoManager {
146 pub fn new(kv_backend: KvBackendRef) -> Self {
147 Self { kv_backend }
148 }
149
150 pub(crate) fn build_create_txn(
152 &self,
153 view_id: TableId,
154 view_info_value: &ViewInfoValue,
155 ) -> Result<(
156 Txn,
157 impl FnOnce(&mut TxnOpGetResponseSet) -> ViewInfoValueDecodeResult,
158 )> {
159 let key = ViewInfoKey::new(view_id);
160 let raw_key = key.to_bytes();
161
162 let txn = Txn::put_if_not_exists(raw_key.clone(), view_info_value.try_as_raw_value()?);
163
164 Ok((
165 txn,
166 TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(raw_key)),
167 ))
168 }
169
170 pub(crate) fn build_update_txn(
173 &self,
174 view_id: TableId,
175 current_view_info_value: &DeserializedValueWithBytes<ViewInfoValue>,
176 new_view_info_value: &ViewInfoValue,
177 ) -> Result<(
178 Txn,
179 impl FnOnce(&mut TxnOpGetResponseSet) -> ViewInfoValueDecodeResult,
180 )> {
181 let key = ViewInfoKey::new(view_id);
182 let raw_key = key.to_bytes();
183 let raw_value = current_view_info_value.get_raw_bytes();
184 let new_raw_value: Vec<u8> = new_view_info_value.try_as_raw_value()?;
185
186 let txn = Txn::compare_and_put(raw_key.clone(), raw_value, new_raw_value);
187
188 Ok((
189 txn,
190 TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(raw_key)),
191 ))
192 }
193
194 pub async fn get(
196 &self,
197 view_id: TableId,
198 ) -> Result<Option<DeserializedValueWithBytes<ViewInfoValue>>> {
199 let key = ViewInfoKey::new(view_id);
200 let raw_key = key.to_bytes();
201 self.kv_backend
202 .get(&raw_key)
203 .await?
204 .map(|x| DeserializedValueWithBytes::from_inner_slice(&x.value))
205 .transpose()
206 }
207
208 pub async fn batch_get(&self, view_ids: &[TableId]) -> Result<HashMap<TableId, ViewInfoValue>> {
210 let lookup_table = view_ids
211 .iter()
212 .map(|id| (ViewInfoKey::new(*id).to_bytes(), id))
213 .collect::<HashMap<_, _>>();
214
215 let resp = self
216 .kv_backend
217 .batch_get(BatchGetRequest {
218 keys: lookup_table.keys().cloned().collect::<Vec<_>>(),
219 })
220 .await?;
221
222 let values = resp
223 .kvs
224 .iter()
225 .map(|kv| {
226 Ok((
227 **lookup_table.get(kv.key()).unwrap(),
229 ViewInfoValue::try_from_raw_value(&kv.value)?,
230 ))
231 })
232 .collect::<Result<HashMap<_, _>>>()?;
233
234 Ok(values)
235 }
236
237 pub async fn batch_get_raw(
239 &self,
240 view_ids: &[TableId],
241 ) -> Result<HashMap<TableId, DeserializedValueWithBytes<ViewInfoValue>>> {
242 let lookup_table = view_ids
243 .iter()
244 .map(|id| (ViewInfoKey::new(*id).to_bytes(), id))
245 .collect::<HashMap<_, _>>();
246
247 let resp = self
248 .kv_backend
249 .batch_get(BatchGetRequest {
250 keys: lookup_table.keys().cloned().collect::<Vec<_>>(),
251 })
252 .await?;
253
254 let values = resp
255 .kvs
256 .iter()
257 .map(|kv| {
258 Ok((
259 **lookup_table.get(kv.key()).unwrap(),
261 DeserializedValueWithBytes::from_inner_slice(&kv.value)?,
262 ))
263 })
264 .collect::<Result<HashMap<_, _>>>()?;
265
266 Ok(values)
267 }
268}
269
270#[cfg(test)]
271mod tests {
272 use super::*;
273
274 #[test]
275 fn test_key_serialization() {
276 let key = ViewInfoKey::new(42);
277 let raw_key = key.to_bytes();
278 assert_eq!(raw_key, b"__view_info/42");
279 }
280
281 #[test]
282 fn test_key_deserialization() {
283 let expected = ViewInfoKey::new(42);
284 let key = ViewInfoKey::from_bytes(b"__view_info/42").unwrap();
285 assert_eq!(key, expected);
286 }
287
288 #[test]
289 fn test_value_serialization() {
290 let table_names = {
291 let mut set = HashSet::new();
292 set.insert(TableName {
293 catalog_name: "greptime".to_string(),
294 schema_name: "public".to_string(),
295 table_name: "a_table".to_string(),
296 });
297 set.insert(TableName {
298 catalog_name: "greptime".to_string(),
299 schema_name: "public".to_string(),
300 table_name: "b_table".to_string(),
301 });
302 set
303 };
304
305 let value = ViewInfoValue {
306 view_info: Bytes::from([1, 2, 3].as_ref()),
307 version: 1,
308 table_names,
309 columns: vec!["a".to_string()],
310 plan_columns: vec!["number".to_string()],
311 definition: "CREATE VIEW test AS SELECT * FROM numbers".to_string(),
312 };
313 let serialized = value.try_as_raw_value().unwrap();
314 let deserialized = ViewInfoValue::try_from_raw_value(&serialized).unwrap();
315 assert_eq!(value, deserialized);
316 }
317
318 #[test]
319 fn test_deserialize_view_info_value_with_vec_u8() {
320 #[derive(Serialize)]
321 struct OldViewInfoValue {
322 view_info: Vec<u8>,
323 table_names: HashSet<TableName>,
324 columns: Vec<String>,
325 plan_columns: Vec<String>,
326 definition: String,
327 version: u64,
328 }
329
330 let table_names = {
331 let mut set = HashSet::new();
332 set.insert(TableName {
333 catalog_name: "greptime".to_string(),
334 schema_name: "public".to_string(),
335 table_name: "a_table".to_string(),
336 });
337 set.insert(TableName {
338 catalog_name: "greptime".to_string(),
339 schema_name: "public".to_string(),
340 table_name: "b_table".to_string(),
341 });
342 set
343 };
344
345 let old_value = OldViewInfoValue {
346 view_info: vec![1, 2, 3],
347 table_names: table_names.clone(),
348 columns: vec!["a".to_string()],
349 plan_columns: vec!["number".to_string()],
350 definition: "CREATE VIEW test AS SELECT * FROM numbers".to_string(),
351 version: 1,
352 };
353
354 let serialized = serde_json::to_vec(&old_value).unwrap();
355 let deserialized = ViewInfoValue::try_from_raw_value(&serialized).unwrap();
356
357 assert_eq!(deserialized.view_info, vec![1, 2, 3]);
358 assert_eq!(deserialized.table_names, table_names);
359 assert_eq!(deserialized.columns, vec!["a".to_string()]);
360 assert_eq!(deserialized.plan_columns, vec!["number".to_string()]);
361 assert_eq!(
362 deserialized.definition,
363 "CREATE VIEW test AS SELECT * FROM numbers"
364 );
365 }
366}