common_meta/key/
view_info.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::fmt::Display;
17use std::sync::Arc;
18
19use serde::{Deserialize, Serialize};
20use snafu::OptionExt;
21use table::metadata::TableId;
22use table::table_name::TableName;
23
24use crate::error::{InvalidViewInfoSnafu, Result};
25use crate::key::txn_helper::TxnOpGetResponseSet;
26use crate::key::{
27    DeserializedValueWithBytes, MetadataKey, MetadataValue, VIEW_INFO_KEY_PATTERN,
28    VIEW_INFO_KEY_PREFIX,
29};
30use crate::kv_backend::txn::Txn;
31use crate::kv_backend::KvBackendRef;
32use crate::rpc::store::BatchGetRequest;
33
34/// The VIEW logical plan encoded bytes
35type RawViewLogicalPlan = Vec<u8>;
36
37/// The key stores the metadata of the view.
38///
39/// The layout: `__view_info/{view_id}`.
40#[derive(Debug, PartialEq)]
41pub struct ViewInfoKey {
42    view_id: TableId,
43}
44
45impl ViewInfoKey {
46    /// Returns a new `[ViewInfoKey]`.
47    pub fn new(view_id: TableId) -> Self {
48        Self { view_id }
49    }
50}
51
52impl Display for ViewInfoKey {
53    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
54        write!(f, "{}/{}", VIEW_INFO_KEY_PREFIX, self.view_id)
55    }
56}
57
58impl MetadataKey<'_, ViewInfoKey> for ViewInfoKey {
59    fn to_bytes(&self) -> Vec<u8> {
60        self.to_string().into_bytes()
61    }
62
63    fn from_bytes(bytes: &[u8]) -> Result<ViewInfoKey> {
64        let key = std::str::from_utf8(bytes).map_err(|e| {
65            InvalidViewInfoSnafu {
66                err_msg: format!(
67                    "ViewInfoKey '{}' is not a valid UTF8 string: {e}",
68                    String::from_utf8_lossy(bytes)
69                ),
70            }
71            .build()
72        })?;
73        let captures =
74            VIEW_INFO_KEY_PATTERN
75                .captures(key)
76                .with_context(|| InvalidViewInfoSnafu {
77                    err_msg: format!("Invalid ViewInfoKey '{key}'"),
78                })?;
79        // Safety: pass the regex check above
80        let view_id = captures[1].parse::<TableId>().unwrap();
81        Ok(ViewInfoKey { view_id })
82    }
83}
84
85/// The VIEW info value that keeps the metadata.
86#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
87pub struct ViewInfoValue {
88    // The encoded logical plan
89    pub view_info: RawViewLogicalPlan,
90    // The resolved fully table names in logical plan
91    pub table_names: HashSet<TableName>,
92    // The view columns
93    pub columns: Vec<String>,
94    // The original plan columns
95    pub plan_columns: Vec<String>,
96    // The SQL to create the view
97    pub definition: String,
98    version: u64,
99}
100
101impl ViewInfoValue {
102    pub fn new(
103        view_info: RawViewLogicalPlan,
104        table_names: HashSet<TableName>,
105        columns: Vec<String>,
106        plan_columns: Vec<String>,
107        definition: String,
108    ) -> Self {
109        Self {
110            view_info,
111            table_names,
112            columns,
113            plan_columns,
114            definition,
115            version: 0,
116        }
117    }
118
119    pub(crate) fn update(
120        &self,
121        new_view_info: RawViewLogicalPlan,
122        table_names: HashSet<TableName>,
123        columns: Vec<String>,
124        plan_columns: Vec<String>,
125        definition: String,
126    ) -> Self {
127        Self {
128            view_info: new_view_info,
129            table_names,
130            columns,
131            plan_columns,
132            definition,
133            version: self.version + 1,
134        }
135    }
136}
137
138/// The `[ViewInfo]` manager
139pub struct ViewInfoManager {
140    kv_backend: KvBackendRef,
141}
142
143pub type ViewInfoManagerRef = Arc<ViewInfoManager>;
144
145pub type ViewInfoValueDecodeResult = Result<Option<DeserializedValueWithBytes<ViewInfoValue>>>;
146
147impl ViewInfoManager {
148    pub fn new(kv_backend: KvBackendRef) -> Self {
149        Self { kv_backend }
150    }
151
152    /// Builds a create view info transaction, it expected the `__view_info/{view_id}` wasn't occupied.
153    pub(crate) fn build_create_txn(
154        &self,
155        view_id: TableId,
156        view_info_value: &ViewInfoValue,
157    ) -> Result<(
158        Txn,
159        impl FnOnce(&mut TxnOpGetResponseSet) -> ViewInfoValueDecodeResult,
160    )> {
161        let key = ViewInfoKey::new(view_id);
162        let raw_key = key.to_bytes();
163
164        let txn = Txn::put_if_not_exists(raw_key.clone(), view_info_value.try_as_raw_value()?);
165
166        Ok((
167            txn,
168            TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(raw_key)),
169        ))
170    }
171
172    /// Builds a update view info transaction, it expected the remote value equals the `current_current_view_info_value`.
173    /// It retrieves the latest value if the comparing failed.
174    pub(crate) fn build_update_txn(
175        &self,
176        view_id: TableId,
177        current_view_info_value: &DeserializedValueWithBytes<ViewInfoValue>,
178        new_view_info_value: &ViewInfoValue,
179    ) -> Result<(
180        Txn,
181        impl FnOnce(&mut TxnOpGetResponseSet) -> ViewInfoValueDecodeResult,
182    )> {
183        let key = ViewInfoKey::new(view_id);
184        let raw_key = key.to_bytes();
185        let raw_value = current_view_info_value.get_raw_bytes();
186        let new_raw_value: Vec<u8> = new_view_info_value.try_as_raw_value()?;
187
188        let txn = Txn::compare_and_put(raw_key.clone(), raw_value, new_raw_value);
189
190        Ok((
191            txn,
192            TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(raw_key)),
193        ))
194    }
195
196    /// Get the `[ViewInfoValue]` by the view id
197    pub async fn get(
198        &self,
199        view_id: TableId,
200    ) -> Result<Option<DeserializedValueWithBytes<ViewInfoValue>>> {
201        let key = ViewInfoKey::new(view_id);
202        let raw_key = key.to_bytes();
203        self.kv_backend
204            .get(&raw_key)
205            .await?
206            .map(|x| DeserializedValueWithBytes::from_inner_slice(&x.value))
207            .transpose()
208    }
209
210    /// Get the `[ViewInfoValue]` by the view id slice in batch
211    pub async fn batch_get(&self, view_ids: &[TableId]) -> Result<HashMap<TableId, ViewInfoValue>> {
212        let lookup_table = view_ids
213            .iter()
214            .map(|id| (ViewInfoKey::new(*id).to_bytes(), id))
215            .collect::<HashMap<_, _>>();
216
217        let resp = self
218            .kv_backend
219            .batch_get(BatchGetRequest {
220                keys: lookup_table.keys().cloned().collect::<Vec<_>>(),
221            })
222            .await?;
223
224        let values = resp
225            .kvs
226            .iter()
227            .map(|kv| {
228                Ok((
229                    // Safety: must exist.
230                    **lookup_table.get(kv.key()).unwrap(),
231                    ViewInfoValue::try_from_raw_value(&kv.value)?,
232                ))
233            })
234            .collect::<Result<HashMap<_, _>>>()?;
235
236        Ok(values)
237    }
238
239    /// Returns batch of `DeserializedValueWithBytes<ViewInfoValue>`.
240    pub async fn batch_get_raw(
241        &self,
242        view_ids: &[TableId],
243    ) -> Result<HashMap<TableId, DeserializedValueWithBytes<ViewInfoValue>>> {
244        let lookup_table = view_ids
245            .iter()
246            .map(|id| (ViewInfoKey::new(*id).to_bytes(), id))
247            .collect::<HashMap<_, _>>();
248
249        let resp = self
250            .kv_backend
251            .batch_get(BatchGetRequest {
252                keys: lookup_table.keys().cloned().collect::<Vec<_>>(),
253            })
254            .await?;
255
256        let values = resp
257            .kvs
258            .iter()
259            .map(|kv| {
260                Ok((
261                    // Safety: must exist.
262                    **lookup_table.get(kv.key()).unwrap(),
263                    DeserializedValueWithBytes::from_inner_slice(&kv.value)?,
264                ))
265            })
266            .collect::<Result<HashMap<_, _>>>()?;
267
268        Ok(values)
269    }
270}
271
272#[cfg(test)]
273mod tests {
274    use super::*;
275
276    #[test]
277    fn test_key_serialization() {
278        let key = ViewInfoKey::new(42);
279        let raw_key = key.to_bytes();
280        assert_eq!(raw_key, b"__view_info/42");
281    }
282
283    #[test]
284    fn test_key_deserialization() {
285        let expected = ViewInfoKey::new(42);
286        let key = ViewInfoKey::from_bytes(b"__view_info/42").unwrap();
287        assert_eq!(key, expected);
288    }
289
290    #[test]
291    fn test_value_serialization() {
292        let table_names = {
293            let mut set = HashSet::new();
294            set.insert(TableName {
295                catalog_name: "greptime".to_string(),
296                schema_name: "public".to_string(),
297                table_name: "a_table".to_string(),
298            });
299            set.insert(TableName {
300                catalog_name: "greptime".to_string(),
301                schema_name: "public".to_string(),
302                table_name: "b_table".to_string(),
303            });
304            set
305        };
306
307        let value = ViewInfoValue {
308            view_info: vec![1, 2, 3],
309            version: 1,
310            table_names,
311            columns: vec!["a".to_string()],
312            plan_columns: vec!["number".to_string()],
313            definition: "CREATE VIEW test AS SELECT * FROM numbers".to_string(),
314        };
315        let serialized = value.try_as_raw_value().unwrap();
316        let deserialized = ViewInfoValue::try_from_raw_value(&serialized).unwrap();
317        assert_eq!(value, deserialized);
318    }
319}