use std::collections::{HashMap, HashSet};
use std::fmt::Display;
use std::sync::Arc;
use serde::{Deserialize, Serialize};
use snafu::OptionExt;
use table::metadata::TableId;
use table::table_name::TableName;
use super::VIEW_INFO_KEY_PATTERN;
use crate::error::{InvalidViewInfoSnafu, Result};
use crate::key::txn_helper::TxnOpGetResponseSet;
use crate::key::{DeserializedValueWithBytes, MetadataKey, MetadataValue, VIEW_INFO_KEY_PREFIX};
use crate::kv_backend::txn::Txn;
use crate::kv_backend::KvBackendRef;
use crate::rpc::store::BatchGetRequest;
type RawViewLogicalPlan = Vec<u8>;
#[derive(Debug, PartialEq)]
pub struct ViewInfoKey {
view_id: TableId,
}
impl ViewInfoKey {
pub fn new(view_id: TableId) -> Self {
Self { view_id }
}
}
impl Display for ViewInfoKey {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}/{}", VIEW_INFO_KEY_PREFIX, self.view_id)
}
}
impl MetadataKey<'_, ViewInfoKey> for ViewInfoKey {
fn to_bytes(&self) -> Vec<u8> {
self.to_string().into_bytes()
}
fn from_bytes(bytes: &[u8]) -> Result<ViewInfoKey> {
let key = std::str::from_utf8(bytes).map_err(|e| {
InvalidViewInfoSnafu {
err_msg: format!(
"ViewInfoKey '{}' is not a valid UTF8 string: {e}",
String::from_utf8_lossy(bytes)
),
}
.build()
})?;
let captures = VIEW_INFO_KEY_PATTERN
.captures(key)
.context(InvalidViewInfoSnafu {
err_msg: format!("Invalid ViewInfoKey '{key}'"),
})?;
let view_id = captures[1].parse::<TableId>().unwrap();
Ok(ViewInfoKey { view_id })
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct ViewInfoValue {
pub view_info: RawViewLogicalPlan,
pub table_names: HashSet<TableName>,
pub columns: Vec<String>,
pub plan_columns: Vec<String>,
pub definition: String,
version: u64,
}
impl ViewInfoValue {
pub fn new(
view_info: RawViewLogicalPlan,
table_names: HashSet<TableName>,
columns: Vec<String>,
plan_columns: Vec<String>,
definition: String,
) -> Self {
Self {
view_info,
table_names,
columns,
plan_columns,
definition,
version: 0,
}
}
pub(crate) fn update(
&self,
new_view_info: RawViewLogicalPlan,
table_names: HashSet<TableName>,
columns: Vec<String>,
plan_columns: Vec<String>,
definition: String,
) -> Self {
Self {
view_info: new_view_info,
table_names,
columns,
plan_columns,
definition,
version: self.version + 1,
}
}
}
pub struct ViewInfoManager {
kv_backend: KvBackendRef,
}
pub type ViewInfoManagerRef = Arc<ViewInfoManager>;
pub type ViewInfoValueDecodeResult = Result<Option<DeserializedValueWithBytes<ViewInfoValue>>>;
impl ViewInfoManager {
pub fn new(kv_backend: KvBackendRef) -> Self {
Self { kv_backend }
}
pub(crate) fn build_create_txn(
&self,
view_id: TableId,
view_info_value: &ViewInfoValue,
) -> Result<(
Txn,
impl FnOnce(&mut TxnOpGetResponseSet) -> ViewInfoValueDecodeResult,
)> {
let key = ViewInfoKey::new(view_id);
let raw_key = key.to_bytes();
let txn = Txn::put_if_not_exists(raw_key.clone(), view_info_value.try_as_raw_value()?);
Ok((
txn,
TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(raw_key)),
))
}
pub(crate) fn build_update_txn(
&self,
view_id: TableId,
current_view_info_value: &DeserializedValueWithBytes<ViewInfoValue>,
new_view_info_value: &ViewInfoValue,
) -> Result<(
Txn,
impl FnOnce(&mut TxnOpGetResponseSet) -> ViewInfoValueDecodeResult,
)> {
let key = ViewInfoKey::new(view_id);
let raw_key = key.to_bytes();
let raw_value = current_view_info_value.get_raw_bytes();
let new_raw_value: Vec<u8> = new_view_info_value.try_as_raw_value()?;
let txn = Txn::compare_and_put(raw_key.clone(), raw_value, new_raw_value);
Ok((
txn,
TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(raw_key)),
))
}
pub async fn get(
&self,
view_id: TableId,
) -> Result<Option<DeserializedValueWithBytes<ViewInfoValue>>> {
let key = ViewInfoKey::new(view_id);
let raw_key = key.to_bytes();
self.kv_backend
.get(&raw_key)
.await?
.map(|x| DeserializedValueWithBytes::from_inner_slice(&x.value))
.transpose()
}
pub async fn batch_get(&self, view_ids: &[TableId]) -> Result<HashMap<TableId, ViewInfoValue>> {
let lookup_table = view_ids
.iter()
.map(|id| (ViewInfoKey::new(*id).to_bytes(), id))
.collect::<HashMap<_, _>>();
let resp = self
.kv_backend
.batch_get(BatchGetRequest {
keys: lookup_table.keys().cloned().collect::<Vec<_>>(),
})
.await?;
let values = resp
.kvs
.iter()
.map(|kv| {
Ok((
**lookup_table.get(kv.key()).unwrap(),
ViewInfoValue::try_from_raw_value(&kv.value)?,
))
})
.collect::<Result<HashMap<_, _>>>()?;
Ok(values)
}
pub async fn batch_get_raw(
&self,
view_ids: &[TableId],
) -> Result<HashMap<TableId, DeserializedValueWithBytes<ViewInfoValue>>> {
let lookup_table = view_ids
.iter()
.map(|id| (ViewInfoKey::new(*id).to_bytes(), id))
.collect::<HashMap<_, _>>();
let resp = self
.kv_backend
.batch_get(BatchGetRequest {
keys: lookup_table.keys().cloned().collect::<Vec<_>>(),
})
.await?;
let values = resp
.kvs
.iter()
.map(|kv| {
Ok((
**lookup_table.get(kv.key()).unwrap(),
DeserializedValueWithBytes::from_inner_slice(&kv.value)?,
))
})
.collect::<Result<HashMap<_, _>>>()?;
Ok(values)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_key_serialization() {
let key = ViewInfoKey::new(42);
let raw_key = key.to_bytes();
assert_eq!(raw_key, b"__view_info/42");
}
#[test]
fn test_key_deserialization() {
let expected = ViewInfoKey::new(42);
let key = ViewInfoKey::from_bytes(b"__view_info/42").unwrap();
assert_eq!(key, expected);
}
#[test]
fn test_value_serialization() {
let table_names = {
let mut set = HashSet::new();
set.insert(TableName {
catalog_name: "greptime".to_string(),
schema_name: "public".to_string(),
table_name: "a_table".to_string(),
});
set.insert(TableName {
catalog_name: "greptime".to_string(),
schema_name: "public".to_string(),
table_name: "b_table".to_string(),
});
set
};
let value = ViewInfoValue {
view_info: vec![1, 2, 3],
version: 1,
table_names,
columns: vec!["a".to_string()],
plan_columns: vec!["number".to_string()],
definition: "CREATE VIEW test AS SELECT * FROM numbers".to_string(),
};
let serialized = value.try_as_raw_value().unwrap();
let deserialized = ViewInfoValue::try_from_raw_value(&serialized).unwrap();
assert_eq!(value, deserialized);
}
}