common_meta/key/
view_info.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::fmt::Display;
17use std::sync::Arc;
18
19use serde::{Deserialize, Serialize};
20use snafu::OptionExt;
21use table::metadata::TableId;
22use table::table_name::TableName;
23
24use crate::error::{InvalidViewInfoSnafu, Result};
25use crate::key::txn_helper::TxnOpGetResponseSet;
26use crate::key::{
27    DeserializedValueWithBytes, MetadataKey, MetadataValue, VIEW_INFO_KEY_PATTERN,
28    VIEW_INFO_KEY_PREFIX,
29};
30use crate::kv_backend::txn::Txn;
31use crate::kv_backend::KvBackendRef;
32use crate::rpc::store::BatchGetRequest;
33
34/// The VIEW logical plan encoded bytes
35type RawViewLogicalPlan = Vec<u8>;
36
37/// The key stores the metadata of the view.
38///
39/// The layout: `__view_info/{view_id}`.
40#[derive(Debug, PartialEq)]
41pub struct ViewInfoKey {
42    view_id: TableId,
43}
44
45impl ViewInfoKey {
46    /// Returns a new `[ViewInfoKey]`.
47    pub fn new(view_id: TableId) -> Self {
48        Self { view_id }
49    }
50}
51
52impl Display for ViewInfoKey {
53    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
54        write!(f, "{}/{}", VIEW_INFO_KEY_PREFIX, self.view_id)
55    }
56}
57
58impl MetadataKey<'_, ViewInfoKey> for ViewInfoKey {
59    fn to_bytes(&self) -> Vec<u8> {
60        self.to_string().into_bytes()
61    }
62
63    fn from_bytes(bytes: &[u8]) -> Result<ViewInfoKey> {
64        let key = std::str::from_utf8(bytes).map_err(|e| {
65            InvalidViewInfoSnafu {
66                err_msg: format!(
67                    "ViewInfoKey '{}' is not a valid UTF8 string: {e}",
68                    String::from_utf8_lossy(bytes)
69                ),
70            }
71            .build()
72        })?;
73        let captures = VIEW_INFO_KEY_PATTERN
74            .captures(key)
75            .context(InvalidViewInfoSnafu {
76                err_msg: format!("Invalid ViewInfoKey '{key}'"),
77            })?;
78        // Safety: pass the regex check above
79        let view_id = captures[1].parse::<TableId>().unwrap();
80        Ok(ViewInfoKey { view_id })
81    }
82}
83
84/// The VIEW info value that keeps the metadata.
85#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
86pub struct ViewInfoValue {
87    // The encoded logical plan
88    pub view_info: RawViewLogicalPlan,
89    // The resolved fully table names in logical plan
90    pub table_names: HashSet<TableName>,
91    // The view columns
92    pub columns: Vec<String>,
93    // The original plan columns
94    pub plan_columns: Vec<String>,
95    // The SQL to create the view
96    pub definition: String,
97    version: u64,
98}
99
100impl ViewInfoValue {
101    pub fn new(
102        view_info: RawViewLogicalPlan,
103        table_names: HashSet<TableName>,
104        columns: Vec<String>,
105        plan_columns: Vec<String>,
106        definition: String,
107    ) -> Self {
108        Self {
109            view_info,
110            table_names,
111            columns,
112            plan_columns,
113            definition,
114            version: 0,
115        }
116    }
117
118    pub(crate) fn update(
119        &self,
120        new_view_info: RawViewLogicalPlan,
121        table_names: HashSet<TableName>,
122        columns: Vec<String>,
123        plan_columns: Vec<String>,
124        definition: String,
125    ) -> Self {
126        Self {
127            view_info: new_view_info,
128            table_names,
129            columns,
130            plan_columns,
131            definition,
132            version: self.version + 1,
133        }
134    }
135}
136
137/// The `[ViewInfo]` manager
138pub struct ViewInfoManager {
139    kv_backend: KvBackendRef,
140}
141
142pub type ViewInfoManagerRef = Arc<ViewInfoManager>;
143
144pub type ViewInfoValueDecodeResult = Result<Option<DeserializedValueWithBytes<ViewInfoValue>>>;
145
146impl ViewInfoManager {
147    pub fn new(kv_backend: KvBackendRef) -> Self {
148        Self { kv_backend }
149    }
150
151    /// Builds a create view info transaction, it expected the `__view_info/{view_id}` wasn't occupied.
152    pub(crate) fn build_create_txn(
153        &self,
154        view_id: TableId,
155        view_info_value: &ViewInfoValue,
156    ) -> Result<(
157        Txn,
158        impl FnOnce(&mut TxnOpGetResponseSet) -> ViewInfoValueDecodeResult,
159    )> {
160        let key = ViewInfoKey::new(view_id);
161        let raw_key = key.to_bytes();
162
163        let txn = Txn::put_if_not_exists(raw_key.clone(), view_info_value.try_as_raw_value()?);
164
165        Ok((
166            txn,
167            TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(raw_key)),
168        ))
169    }
170
171    /// Builds a update view info transaction, it expected the remote value equals the `current_current_view_info_value`.
172    /// It retrieves the latest value if the comparing failed.
173    pub(crate) fn build_update_txn(
174        &self,
175        view_id: TableId,
176        current_view_info_value: &DeserializedValueWithBytes<ViewInfoValue>,
177        new_view_info_value: &ViewInfoValue,
178    ) -> Result<(
179        Txn,
180        impl FnOnce(&mut TxnOpGetResponseSet) -> ViewInfoValueDecodeResult,
181    )> {
182        let key = ViewInfoKey::new(view_id);
183        let raw_key = key.to_bytes();
184        let raw_value = current_view_info_value.get_raw_bytes();
185        let new_raw_value: Vec<u8> = new_view_info_value.try_as_raw_value()?;
186
187        let txn = Txn::compare_and_put(raw_key.clone(), raw_value, new_raw_value);
188
189        Ok((
190            txn,
191            TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(raw_key)),
192        ))
193    }
194
195    /// Get the `[ViewInfoValue]` by the view id
196    pub async fn get(
197        &self,
198        view_id: TableId,
199    ) -> Result<Option<DeserializedValueWithBytes<ViewInfoValue>>> {
200        let key = ViewInfoKey::new(view_id);
201        let raw_key = key.to_bytes();
202        self.kv_backend
203            .get(&raw_key)
204            .await?
205            .map(|x| DeserializedValueWithBytes::from_inner_slice(&x.value))
206            .transpose()
207    }
208
209    /// Get the `[ViewInfoValue]` by the view id slice in batch
210    pub async fn batch_get(&self, view_ids: &[TableId]) -> Result<HashMap<TableId, ViewInfoValue>> {
211        let lookup_table = view_ids
212            .iter()
213            .map(|id| (ViewInfoKey::new(*id).to_bytes(), id))
214            .collect::<HashMap<_, _>>();
215
216        let resp = self
217            .kv_backend
218            .batch_get(BatchGetRequest {
219                keys: lookup_table.keys().cloned().collect::<Vec<_>>(),
220            })
221            .await?;
222
223        let values = resp
224            .kvs
225            .iter()
226            .map(|kv| {
227                Ok((
228                    // Safety: must exist.
229                    **lookup_table.get(kv.key()).unwrap(),
230                    ViewInfoValue::try_from_raw_value(&kv.value)?,
231                ))
232            })
233            .collect::<Result<HashMap<_, _>>>()?;
234
235        Ok(values)
236    }
237
238    /// Returns batch of `DeserializedValueWithBytes<ViewInfoValue>`.
239    pub async fn batch_get_raw(
240        &self,
241        view_ids: &[TableId],
242    ) -> Result<HashMap<TableId, DeserializedValueWithBytes<ViewInfoValue>>> {
243        let lookup_table = view_ids
244            .iter()
245            .map(|id| (ViewInfoKey::new(*id).to_bytes(), id))
246            .collect::<HashMap<_, _>>();
247
248        let resp = self
249            .kv_backend
250            .batch_get(BatchGetRequest {
251                keys: lookup_table.keys().cloned().collect::<Vec<_>>(),
252            })
253            .await?;
254
255        let values = resp
256            .kvs
257            .iter()
258            .map(|kv| {
259                Ok((
260                    // Safety: must exist.
261                    **lookup_table.get(kv.key()).unwrap(),
262                    DeserializedValueWithBytes::from_inner_slice(&kv.value)?,
263                ))
264            })
265            .collect::<Result<HashMap<_, _>>>()?;
266
267        Ok(values)
268    }
269}
270
271#[cfg(test)]
272mod tests {
273    use super::*;
274
275    #[test]
276    fn test_key_serialization() {
277        let key = ViewInfoKey::new(42);
278        let raw_key = key.to_bytes();
279        assert_eq!(raw_key, b"__view_info/42");
280    }
281
282    #[test]
283    fn test_key_deserialization() {
284        let expected = ViewInfoKey::new(42);
285        let key = ViewInfoKey::from_bytes(b"__view_info/42").unwrap();
286        assert_eq!(key, expected);
287    }
288
289    #[test]
290    fn test_value_serialization() {
291        let table_names = {
292            let mut set = HashSet::new();
293            set.insert(TableName {
294                catalog_name: "greptime".to_string(),
295                schema_name: "public".to_string(),
296                table_name: "a_table".to_string(),
297            });
298            set.insert(TableName {
299                catalog_name: "greptime".to_string(),
300                schema_name: "public".to_string(),
301                table_name: "b_table".to_string(),
302            });
303            set
304        };
305
306        let value = ViewInfoValue {
307            view_info: vec![1, 2, 3],
308            version: 1,
309            table_names,
310            columns: vec!["a".to_string()],
311            plan_columns: vec!["number".to_string()],
312            definition: "CREATE VIEW test AS SELECT * FROM numbers".to_string(),
313        };
314        let serialized = value.try_as_raw_value().unwrap();
315        let deserialized = ViewInfoValue::try_from_raw_value(&serialized).unwrap();
316        assert_eq!(value, deserialized);
317    }
318}