Skip to main content

common_meta/key/
view_info.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::fmt::Display;
17use std::sync::Arc;
18
19use common_base::bytes::Bytes;
20use serde::{Deserialize, Serialize};
21use snafu::OptionExt;
22use table::metadata::TableId;
23use table::table_name::TableName;
24
25use crate::error::{InvalidViewInfoSnafu, Result};
26use crate::key::txn_helper::TxnOpGetResponseSet;
27use crate::key::{
28    DeserializedValueWithBytes, MetadataKey, MetadataValue, VIEW_INFO_KEY_PATTERN,
29    VIEW_INFO_KEY_PREFIX,
30};
31use crate::kv_backend::KvBackendRef;
32use crate::kv_backend::txn::Txn;
33use crate::rpc::store::BatchGetRequest;
34
35/// The key stores the metadata of the view.
36///
37/// The layout: `__view_info/{view_id}`.
38#[derive(Debug, PartialEq)]
39pub struct ViewInfoKey {
40    view_id: TableId,
41}
42
43impl ViewInfoKey {
44    /// Returns a new `[ViewInfoKey]`.
45    pub fn new(view_id: TableId) -> Self {
46        Self { view_id }
47    }
48}
49
50impl Display for ViewInfoKey {
51    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
52        write!(f, "{}/{}", VIEW_INFO_KEY_PREFIX, self.view_id)
53    }
54}
55
56impl MetadataKey<'_, ViewInfoKey> for ViewInfoKey {
57    fn to_bytes(&self) -> Vec<u8> {
58        self.to_string().into_bytes()
59    }
60
61    fn from_bytes(bytes: &[u8]) -> Result<ViewInfoKey> {
62        let key = std::str::from_utf8(bytes).map_err(|e| {
63            InvalidViewInfoSnafu {
64                err_msg: format!(
65                    "ViewInfoKey '{}' is not a valid UTF8 string: {e}",
66                    String::from_utf8_lossy(bytes)
67                ),
68            }
69            .build()
70        })?;
71        let captures =
72            VIEW_INFO_KEY_PATTERN
73                .captures(key)
74                .with_context(|| InvalidViewInfoSnafu {
75                    err_msg: format!("Invalid ViewInfoKey '{key}'"),
76                })?;
77        // Safety: pass the regex check above
78        let view_id = captures[1].parse::<TableId>().unwrap();
79        Ok(ViewInfoKey { view_id })
80    }
81}
82
83/// The VIEW info value that keeps the metadata.
84#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
85pub struct ViewInfoValue {
86    // The encoded logical plan
87    pub view_info: Bytes,
88    // The resolved fully table names in logical plan
89    pub table_names: HashSet<TableName>,
90    // The view columns
91    pub columns: Vec<String>,
92    // The original plan columns
93    pub plan_columns: Vec<String>,
94    // The SQL to create the view
95    pub definition: String,
96    version: u64,
97}
98
99impl ViewInfoValue {
100    pub fn new(
101        view_info: Bytes,
102        table_names: HashSet<TableName>,
103        columns: Vec<String>,
104        plan_columns: Vec<String>,
105        definition: String,
106    ) -> Self {
107        Self {
108            view_info,
109            table_names,
110            columns,
111            plan_columns,
112            definition,
113            version: 0,
114        }
115    }
116
117    pub(crate) fn update(
118        &self,
119        new_view_info: Bytes,
120        table_names: HashSet<TableName>,
121        columns: Vec<String>,
122        plan_columns: Vec<String>,
123        definition: String,
124    ) -> Self {
125        Self {
126            view_info: new_view_info,
127            table_names,
128            columns,
129            plan_columns,
130            definition,
131            version: self.version + 1,
132        }
133    }
134}
135
136/// The `[ViewInfo]` manager
137pub struct ViewInfoManager {
138    kv_backend: KvBackendRef,
139}
140
141pub type ViewInfoManagerRef = Arc<ViewInfoManager>;
142
143pub type ViewInfoValueDecodeResult = Result<Option<DeserializedValueWithBytes<ViewInfoValue>>>;
144
145impl ViewInfoManager {
146    pub fn new(kv_backend: KvBackendRef) -> Self {
147        Self { kv_backend }
148    }
149
150    /// Builds a create view info transaction, it expected the `__view_info/{view_id}` wasn't occupied.
151    pub(crate) fn build_create_txn(
152        &self,
153        view_id: TableId,
154        view_info_value: &ViewInfoValue,
155    ) -> Result<(
156        Txn,
157        impl FnOnce(&mut TxnOpGetResponseSet) -> ViewInfoValueDecodeResult,
158    )> {
159        let key = ViewInfoKey::new(view_id);
160        let raw_key = key.to_bytes();
161
162        let txn = Txn::put_if_not_exists(raw_key.clone(), view_info_value.try_as_raw_value()?);
163
164        Ok((
165            txn,
166            TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(raw_key)),
167        ))
168    }
169
170    /// Builds a update view info transaction, it expected the remote value equals the `current_current_view_info_value`.
171    /// It retrieves the latest value if the comparing failed.
172    pub(crate) fn build_update_txn(
173        &self,
174        view_id: TableId,
175        current_view_info_value: &DeserializedValueWithBytes<ViewInfoValue>,
176        new_view_info_value: &ViewInfoValue,
177    ) -> Result<(
178        Txn,
179        impl FnOnce(&mut TxnOpGetResponseSet) -> ViewInfoValueDecodeResult,
180    )> {
181        let key = ViewInfoKey::new(view_id);
182        let raw_key = key.to_bytes();
183        let raw_value = current_view_info_value.get_raw_bytes();
184        let new_raw_value: Vec<u8> = new_view_info_value.try_as_raw_value()?;
185
186        let txn = Txn::compare_and_put(raw_key.clone(), raw_value, new_raw_value);
187
188        Ok((
189            txn,
190            TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(raw_key)),
191        ))
192    }
193
194    /// Get the `[ViewInfoValue]` by the view id
195    pub async fn get(
196        &self,
197        view_id: TableId,
198    ) -> Result<Option<DeserializedValueWithBytes<ViewInfoValue>>> {
199        let key = ViewInfoKey::new(view_id);
200        let raw_key = key.to_bytes();
201        self.kv_backend
202            .get(&raw_key)
203            .await?
204            .map(|x| DeserializedValueWithBytes::from_inner_slice(&x.value))
205            .transpose()
206    }
207
208    /// Get the `[ViewInfoValue]` by the view id slice in batch
209    pub async fn batch_get(&self, view_ids: &[TableId]) -> Result<HashMap<TableId, ViewInfoValue>> {
210        let lookup_table = view_ids
211            .iter()
212            .map(|id| (ViewInfoKey::new(*id).to_bytes(), id))
213            .collect::<HashMap<_, _>>();
214
215        let resp = self
216            .kv_backend
217            .batch_get(BatchGetRequest {
218                keys: lookup_table.keys().cloned().collect::<Vec<_>>(),
219            })
220            .await?;
221
222        let values = resp
223            .kvs
224            .iter()
225            .map(|kv| {
226                Ok((
227                    // Safety: must exist.
228                    **lookup_table.get(kv.key()).unwrap(),
229                    ViewInfoValue::try_from_raw_value(&kv.value)?,
230                ))
231            })
232            .collect::<Result<HashMap<_, _>>>()?;
233
234        Ok(values)
235    }
236
237    /// Returns batch of `DeserializedValueWithBytes<ViewInfoValue>`.
238    pub async fn batch_get_raw(
239        &self,
240        view_ids: &[TableId],
241    ) -> Result<HashMap<TableId, DeserializedValueWithBytes<ViewInfoValue>>> {
242        let lookup_table = view_ids
243            .iter()
244            .map(|id| (ViewInfoKey::new(*id).to_bytes(), id))
245            .collect::<HashMap<_, _>>();
246
247        let resp = self
248            .kv_backend
249            .batch_get(BatchGetRequest {
250                keys: lookup_table.keys().cloned().collect::<Vec<_>>(),
251            })
252            .await?;
253
254        let values = resp
255            .kvs
256            .iter()
257            .map(|kv| {
258                Ok((
259                    // Safety: must exist.
260                    **lookup_table.get(kv.key()).unwrap(),
261                    DeserializedValueWithBytes::from_inner_slice(&kv.value)?,
262                ))
263            })
264            .collect::<Result<HashMap<_, _>>>()?;
265
266        Ok(values)
267    }
268}
269
270#[cfg(test)]
271mod tests {
272    use super::*;
273
274    #[test]
275    fn test_key_serialization() {
276        let key = ViewInfoKey::new(42);
277        let raw_key = key.to_bytes();
278        assert_eq!(raw_key, b"__view_info/42");
279    }
280
281    #[test]
282    fn test_key_deserialization() {
283        let expected = ViewInfoKey::new(42);
284        let key = ViewInfoKey::from_bytes(b"__view_info/42").unwrap();
285        assert_eq!(key, expected);
286    }
287
288    #[test]
289    fn test_value_serialization() {
290        let table_names = {
291            let mut set = HashSet::new();
292            set.insert(TableName {
293                catalog_name: "greptime".to_string(),
294                schema_name: "public".to_string(),
295                table_name: "a_table".to_string(),
296            });
297            set.insert(TableName {
298                catalog_name: "greptime".to_string(),
299                schema_name: "public".to_string(),
300                table_name: "b_table".to_string(),
301            });
302            set
303        };
304
305        let value = ViewInfoValue {
306            view_info: Bytes::from([1, 2, 3].as_ref()),
307            version: 1,
308            table_names,
309            columns: vec!["a".to_string()],
310            plan_columns: vec!["number".to_string()],
311            definition: "CREATE VIEW test AS SELECT * FROM numbers".to_string(),
312        };
313        let serialized = value.try_as_raw_value().unwrap();
314        let deserialized = ViewInfoValue::try_from_raw_value(&serialized).unwrap();
315        assert_eq!(value, deserialized);
316    }
317
318    #[test]
319    fn test_deserialize_view_info_value_with_vec_u8() {
320        #[derive(Serialize)]
321        struct OldViewInfoValue {
322            view_info: Vec<u8>,
323            table_names: HashSet<TableName>,
324            columns: Vec<String>,
325            plan_columns: Vec<String>,
326            definition: String,
327            version: u64,
328        }
329
330        let table_names = {
331            let mut set = HashSet::new();
332            set.insert(TableName {
333                catalog_name: "greptime".to_string(),
334                schema_name: "public".to_string(),
335                table_name: "a_table".to_string(),
336            });
337            set.insert(TableName {
338                catalog_name: "greptime".to_string(),
339                schema_name: "public".to_string(),
340                table_name: "b_table".to_string(),
341            });
342            set
343        };
344
345        let old_value = OldViewInfoValue {
346            view_info: vec![1, 2, 3],
347            table_names: table_names.clone(),
348            columns: vec!["a".to_string()],
349            plan_columns: vec!["number".to_string()],
350            definition: "CREATE VIEW test AS SELECT * FROM numbers".to_string(),
351            version: 1,
352        };
353
354        let serialized = serde_json::to_vec(&old_value).unwrap();
355        let deserialized = ViewInfoValue::try_from_raw_value(&serialized).unwrap();
356
357        assert_eq!(deserialized.view_info, vec![1, 2, 3]);
358        assert_eq!(deserialized.table_names, table_names);
359        assert_eq!(deserialized.columns, vec!["a".to_string()]);
360        assert_eq!(deserialized.plan_columns, vec!["number".to_string()]);
361        assert_eq!(
362            deserialized.definition,
363            "CREATE VIEW test AS SELECT * FROM numbers"
364        );
365    }
366}