1use std::collections::{HashMap, HashSet};
16use std::fmt::Display;
17use std::sync::Arc;
18
19use serde::{Deserialize, Serialize};
20use snafu::OptionExt;
21use table::metadata::TableId;
22use table::table_name::TableName;
23
24use crate::error::{InvalidViewInfoSnafu, Result};
25use crate::key::txn_helper::TxnOpGetResponseSet;
26use crate::key::{
27 DeserializedValueWithBytes, MetadataKey, MetadataValue, VIEW_INFO_KEY_PATTERN,
28 VIEW_INFO_KEY_PREFIX,
29};
30use crate::kv_backend::txn::Txn;
31use crate::kv_backend::KvBackendRef;
32use crate::rpc::store::BatchGetRequest;
33
34type RawViewLogicalPlan = Vec<u8>;
36
37#[derive(Debug, PartialEq)]
41pub struct ViewInfoKey {
42 view_id: TableId,
43}
44
45impl ViewInfoKey {
46 pub fn new(view_id: TableId) -> Self {
48 Self { view_id }
49 }
50}
51
52impl Display for ViewInfoKey {
53 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
54 write!(f, "{}/{}", VIEW_INFO_KEY_PREFIX, self.view_id)
55 }
56}
57
58impl MetadataKey<'_, ViewInfoKey> for ViewInfoKey {
59 fn to_bytes(&self) -> Vec<u8> {
60 self.to_string().into_bytes()
61 }
62
63 fn from_bytes(bytes: &[u8]) -> Result<ViewInfoKey> {
64 let key = std::str::from_utf8(bytes).map_err(|e| {
65 InvalidViewInfoSnafu {
66 err_msg: format!(
67 "ViewInfoKey '{}' is not a valid UTF8 string: {e}",
68 String::from_utf8_lossy(bytes)
69 ),
70 }
71 .build()
72 })?;
73 let captures =
74 VIEW_INFO_KEY_PATTERN
75 .captures(key)
76 .with_context(|| InvalidViewInfoSnafu {
77 err_msg: format!("Invalid ViewInfoKey '{key}'"),
78 })?;
79 let view_id = captures[1].parse::<TableId>().unwrap();
81 Ok(ViewInfoKey { view_id })
82 }
83}
84
85#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
87pub struct ViewInfoValue {
88 pub view_info: RawViewLogicalPlan,
90 pub table_names: HashSet<TableName>,
92 pub columns: Vec<String>,
94 pub plan_columns: Vec<String>,
96 pub definition: String,
98 version: u64,
99}
100
101impl ViewInfoValue {
102 pub fn new(
103 view_info: RawViewLogicalPlan,
104 table_names: HashSet<TableName>,
105 columns: Vec<String>,
106 plan_columns: Vec<String>,
107 definition: String,
108 ) -> Self {
109 Self {
110 view_info,
111 table_names,
112 columns,
113 plan_columns,
114 definition,
115 version: 0,
116 }
117 }
118
119 pub(crate) fn update(
120 &self,
121 new_view_info: RawViewLogicalPlan,
122 table_names: HashSet<TableName>,
123 columns: Vec<String>,
124 plan_columns: Vec<String>,
125 definition: String,
126 ) -> Self {
127 Self {
128 view_info: new_view_info,
129 table_names,
130 columns,
131 plan_columns,
132 definition,
133 version: self.version + 1,
134 }
135 }
136}
137
138pub struct ViewInfoManager {
140 kv_backend: KvBackendRef,
141}
142
143pub type ViewInfoManagerRef = Arc<ViewInfoManager>;
144
145pub type ViewInfoValueDecodeResult = Result<Option<DeserializedValueWithBytes<ViewInfoValue>>>;
146
147impl ViewInfoManager {
148 pub fn new(kv_backend: KvBackendRef) -> Self {
149 Self { kv_backend }
150 }
151
152 pub(crate) fn build_create_txn(
154 &self,
155 view_id: TableId,
156 view_info_value: &ViewInfoValue,
157 ) -> Result<(
158 Txn,
159 impl FnOnce(&mut TxnOpGetResponseSet) -> ViewInfoValueDecodeResult,
160 )> {
161 let key = ViewInfoKey::new(view_id);
162 let raw_key = key.to_bytes();
163
164 let txn = Txn::put_if_not_exists(raw_key.clone(), view_info_value.try_as_raw_value()?);
165
166 Ok((
167 txn,
168 TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(raw_key)),
169 ))
170 }
171
172 pub(crate) fn build_update_txn(
175 &self,
176 view_id: TableId,
177 current_view_info_value: &DeserializedValueWithBytes<ViewInfoValue>,
178 new_view_info_value: &ViewInfoValue,
179 ) -> Result<(
180 Txn,
181 impl FnOnce(&mut TxnOpGetResponseSet) -> ViewInfoValueDecodeResult,
182 )> {
183 let key = ViewInfoKey::new(view_id);
184 let raw_key = key.to_bytes();
185 let raw_value = current_view_info_value.get_raw_bytes();
186 let new_raw_value: Vec<u8> = new_view_info_value.try_as_raw_value()?;
187
188 let txn = Txn::compare_and_put(raw_key.clone(), raw_value, new_raw_value);
189
190 Ok((
191 txn,
192 TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(raw_key)),
193 ))
194 }
195
196 pub async fn get(
198 &self,
199 view_id: TableId,
200 ) -> Result<Option<DeserializedValueWithBytes<ViewInfoValue>>> {
201 let key = ViewInfoKey::new(view_id);
202 let raw_key = key.to_bytes();
203 self.kv_backend
204 .get(&raw_key)
205 .await?
206 .map(|x| DeserializedValueWithBytes::from_inner_slice(&x.value))
207 .transpose()
208 }
209
210 pub async fn batch_get(&self, view_ids: &[TableId]) -> Result<HashMap<TableId, ViewInfoValue>> {
212 let lookup_table = view_ids
213 .iter()
214 .map(|id| (ViewInfoKey::new(*id).to_bytes(), id))
215 .collect::<HashMap<_, _>>();
216
217 let resp = self
218 .kv_backend
219 .batch_get(BatchGetRequest {
220 keys: lookup_table.keys().cloned().collect::<Vec<_>>(),
221 })
222 .await?;
223
224 let values = resp
225 .kvs
226 .iter()
227 .map(|kv| {
228 Ok((
229 **lookup_table.get(kv.key()).unwrap(),
231 ViewInfoValue::try_from_raw_value(&kv.value)?,
232 ))
233 })
234 .collect::<Result<HashMap<_, _>>>()?;
235
236 Ok(values)
237 }
238
239 pub async fn batch_get_raw(
241 &self,
242 view_ids: &[TableId],
243 ) -> Result<HashMap<TableId, DeserializedValueWithBytes<ViewInfoValue>>> {
244 let lookup_table = view_ids
245 .iter()
246 .map(|id| (ViewInfoKey::new(*id).to_bytes(), id))
247 .collect::<HashMap<_, _>>();
248
249 let resp = self
250 .kv_backend
251 .batch_get(BatchGetRequest {
252 keys: lookup_table.keys().cloned().collect::<Vec<_>>(),
253 })
254 .await?;
255
256 let values = resp
257 .kvs
258 .iter()
259 .map(|kv| {
260 Ok((
261 **lookup_table.get(kv.key()).unwrap(),
263 DeserializedValueWithBytes::from_inner_slice(&kv.value)?,
264 ))
265 })
266 .collect::<Result<HashMap<_, _>>>()?;
267
268 Ok(values)
269 }
270}
271
272#[cfg(test)]
273mod tests {
274 use super::*;
275
276 #[test]
277 fn test_key_serialization() {
278 let key = ViewInfoKey::new(42);
279 let raw_key = key.to_bytes();
280 assert_eq!(raw_key, b"__view_info/42");
281 }
282
283 #[test]
284 fn test_key_deserialization() {
285 let expected = ViewInfoKey::new(42);
286 let key = ViewInfoKey::from_bytes(b"__view_info/42").unwrap();
287 assert_eq!(key, expected);
288 }
289
290 #[test]
291 fn test_value_serialization() {
292 let table_names = {
293 let mut set = HashSet::new();
294 set.insert(TableName {
295 catalog_name: "greptime".to_string(),
296 schema_name: "public".to_string(),
297 table_name: "a_table".to_string(),
298 });
299 set.insert(TableName {
300 catalog_name: "greptime".to_string(),
301 schema_name: "public".to_string(),
302 table_name: "b_table".to_string(),
303 });
304 set
305 };
306
307 let value = ViewInfoValue {
308 view_info: vec![1, 2, 3],
309 version: 1,
310 table_names,
311 columns: vec!["a".to_string()],
312 plan_columns: vec!["number".to_string()],
313 definition: "CREATE VIEW test AS SELECT * FROM numbers".to_string(),
314 };
315 let serialized = value.try_as_raw_value().unwrap();
316 let deserialized = ViewInfoValue::try_from_raw_value(&serialized).unwrap();
317 assert_eq!(value, deserialized);
318 }
319}