1use std::collections::{HashMap, HashSet};
16use std::fmt::Display;
17use std::sync::Arc;
18
19use serde::{Deserialize, Serialize};
20use snafu::OptionExt;
21use table::metadata::TableId;
22use table::table_name::TableName;
23
24use crate::error::{InvalidViewInfoSnafu, Result};
25use crate::key::txn_helper::TxnOpGetResponseSet;
26use crate::key::{
27 DeserializedValueWithBytes, MetadataKey, MetadataValue, VIEW_INFO_KEY_PATTERN,
28 VIEW_INFO_KEY_PREFIX,
29};
30use crate::kv_backend::txn::Txn;
31use crate::kv_backend::KvBackendRef;
32use crate::rpc::store::BatchGetRequest;
33
34type RawViewLogicalPlan = Vec<u8>;
36
37#[derive(Debug, PartialEq)]
41pub struct ViewInfoKey {
42 view_id: TableId,
43}
44
45impl ViewInfoKey {
46 pub fn new(view_id: TableId) -> Self {
48 Self { view_id }
49 }
50}
51
52impl Display for ViewInfoKey {
53 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
54 write!(f, "{}/{}", VIEW_INFO_KEY_PREFIX, self.view_id)
55 }
56}
57
58impl MetadataKey<'_, ViewInfoKey> for ViewInfoKey {
59 fn to_bytes(&self) -> Vec<u8> {
60 self.to_string().into_bytes()
61 }
62
63 fn from_bytes(bytes: &[u8]) -> Result<ViewInfoKey> {
64 let key = std::str::from_utf8(bytes).map_err(|e| {
65 InvalidViewInfoSnafu {
66 err_msg: format!(
67 "ViewInfoKey '{}' is not a valid UTF8 string: {e}",
68 String::from_utf8_lossy(bytes)
69 ),
70 }
71 .build()
72 })?;
73 let captures = VIEW_INFO_KEY_PATTERN
74 .captures(key)
75 .context(InvalidViewInfoSnafu {
76 err_msg: format!("Invalid ViewInfoKey '{key}'"),
77 })?;
78 let view_id = captures[1].parse::<TableId>().unwrap();
80 Ok(ViewInfoKey { view_id })
81 }
82}
83
84#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
86pub struct ViewInfoValue {
87 pub view_info: RawViewLogicalPlan,
89 pub table_names: HashSet<TableName>,
91 pub columns: Vec<String>,
93 pub plan_columns: Vec<String>,
95 pub definition: String,
97 version: u64,
98}
99
100impl ViewInfoValue {
101 pub fn new(
102 view_info: RawViewLogicalPlan,
103 table_names: HashSet<TableName>,
104 columns: Vec<String>,
105 plan_columns: Vec<String>,
106 definition: String,
107 ) -> Self {
108 Self {
109 view_info,
110 table_names,
111 columns,
112 plan_columns,
113 definition,
114 version: 0,
115 }
116 }
117
118 pub(crate) fn update(
119 &self,
120 new_view_info: RawViewLogicalPlan,
121 table_names: HashSet<TableName>,
122 columns: Vec<String>,
123 plan_columns: Vec<String>,
124 definition: String,
125 ) -> Self {
126 Self {
127 view_info: new_view_info,
128 table_names,
129 columns,
130 plan_columns,
131 definition,
132 version: self.version + 1,
133 }
134 }
135}
136
137pub struct ViewInfoManager {
139 kv_backend: KvBackendRef,
140}
141
142pub type ViewInfoManagerRef = Arc<ViewInfoManager>;
143
144pub type ViewInfoValueDecodeResult = Result<Option<DeserializedValueWithBytes<ViewInfoValue>>>;
145
146impl ViewInfoManager {
147 pub fn new(kv_backend: KvBackendRef) -> Self {
148 Self { kv_backend }
149 }
150
151 pub(crate) fn build_create_txn(
153 &self,
154 view_id: TableId,
155 view_info_value: &ViewInfoValue,
156 ) -> Result<(
157 Txn,
158 impl FnOnce(&mut TxnOpGetResponseSet) -> ViewInfoValueDecodeResult,
159 )> {
160 let key = ViewInfoKey::new(view_id);
161 let raw_key = key.to_bytes();
162
163 let txn = Txn::put_if_not_exists(raw_key.clone(), view_info_value.try_as_raw_value()?);
164
165 Ok((
166 txn,
167 TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(raw_key)),
168 ))
169 }
170
171 pub(crate) fn build_update_txn(
174 &self,
175 view_id: TableId,
176 current_view_info_value: &DeserializedValueWithBytes<ViewInfoValue>,
177 new_view_info_value: &ViewInfoValue,
178 ) -> Result<(
179 Txn,
180 impl FnOnce(&mut TxnOpGetResponseSet) -> ViewInfoValueDecodeResult,
181 )> {
182 let key = ViewInfoKey::new(view_id);
183 let raw_key = key.to_bytes();
184 let raw_value = current_view_info_value.get_raw_bytes();
185 let new_raw_value: Vec<u8> = new_view_info_value.try_as_raw_value()?;
186
187 let txn = Txn::compare_and_put(raw_key.clone(), raw_value, new_raw_value);
188
189 Ok((
190 txn,
191 TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(raw_key)),
192 ))
193 }
194
195 pub async fn get(
197 &self,
198 view_id: TableId,
199 ) -> Result<Option<DeserializedValueWithBytes<ViewInfoValue>>> {
200 let key = ViewInfoKey::new(view_id);
201 let raw_key = key.to_bytes();
202 self.kv_backend
203 .get(&raw_key)
204 .await?
205 .map(|x| DeserializedValueWithBytes::from_inner_slice(&x.value))
206 .transpose()
207 }
208
209 pub async fn batch_get(&self, view_ids: &[TableId]) -> Result<HashMap<TableId, ViewInfoValue>> {
211 let lookup_table = view_ids
212 .iter()
213 .map(|id| (ViewInfoKey::new(*id).to_bytes(), id))
214 .collect::<HashMap<_, _>>();
215
216 let resp = self
217 .kv_backend
218 .batch_get(BatchGetRequest {
219 keys: lookup_table.keys().cloned().collect::<Vec<_>>(),
220 })
221 .await?;
222
223 let values = resp
224 .kvs
225 .iter()
226 .map(|kv| {
227 Ok((
228 **lookup_table.get(kv.key()).unwrap(),
230 ViewInfoValue::try_from_raw_value(&kv.value)?,
231 ))
232 })
233 .collect::<Result<HashMap<_, _>>>()?;
234
235 Ok(values)
236 }
237
238 pub async fn batch_get_raw(
240 &self,
241 view_ids: &[TableId],
242 ) -> Result<HashMap<TableId, DeserializedValueWithBytes<ViewInfoValue>>> {
243 let lookup_table = view_ids
244 .iter()
245 .map(|id| (ViewInfoKey::new(*id).to_bytes(), id))
246 .collect::<HashMap<_, _>>();
247
248 let resp = self
249 .kv_backend
250 .batch_get(BatchGetRequest {
251 keys: lookup_table.keys().cloned().collect::<Vec<_>>(),
252 })
253 .await?;
254
255 let values = resp
256 .kvs
257 .iter()
258 .map(|kv| {
259 Ok((
260 **lookup_table.get(kv.key()).unwrap(),
262 DeserializedValueWithBytes::from_inner_slice(&kv.value)?,
263 ))
264 })
265 .collect::<Result<HashMap<_, _>>>()?;
266
267 Ok(values)
268 }
269}
270
271#[cfg(test)]
272mod tests {
273 use super::*;
274
275 #[test]
276 fn test_key_serialization() {
277 let key = ViewInfoKey::new(42);
278 let raw_key = key.to_bytes();
279 assert_eq!(raw_key, b"__view_info/42");
280 }
281
282 #[test]
283 fn test_key_deserialization() {
284 let expected = ViewInfoKey::new(42);
285 let key = ViewInfoKey::from_bytes(b"__view_info/42").unwrap();
286 assert_eq!(key, expected);
287 }
288
289 #[test]
290 fn test_value_serialization() {
291 let table_names = {
292 let mut set = HashSet::new();
293 set.insert(TableName {
294 catalog_name: "greptime".to_string(),
295 schema_name: "public".to_string(),
296 table_name: "a_table".to_string(),
297 });
298 set.insert(TableName {
299 catalog_name: "greptime".to_string(),
300 schema_name: "public".to_string(),
301 table_name: "b_table".to_string(),
302 });
303 set
304 };
305
306 let value = ViewInfoValue {
307 view_info: vec![1, 2, 3],
308 version: 1,
309 table_names,
310 columns: vec!["a".to_string()],
311 plan_columns: vec!["number".to_string()],
312 definition: "CREATE VIEW test AS SELECT * FROM numbers".to_string(),
313 };
314 let serialized = value.try_as_raw_value().unwrap();
315 let deserialized = ViewInfoValue::try_from_raw_value(&serialized).unwrap();
316 assert_eq!(value, deserialized);
317 }
318}