use futures::stream::BoxStream;
use lazy_static::lazy_static;
use regex::Regex;
use serde::{Deserialize, Serialize};
use snafu::OptionExt;
use crate::error::{self, Result};
use crate::key::flow::FlowScoped;
use crate::key::txn_helper::TxnOpGetResponseSet;
use crate::key::{
BytesAdapter, DeserializedValueWithBytes, FlowId, MetadataKey, MetadataValue, NAME_PATTERN,
};
use crate::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp};
use crate::kv_backend::KvBackendRef;
use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE};
use crate::rpc::store::RangeRequest;
use crate::rpc::KeyValue;
const FLOW_NAME_KEY_PREFIX: &str = "name";
lazy_static! {
static ref FLOW_NAME_KEY_PATTERN: Regex = Regex::new(&format!(
"^{FLOW_NAME_KEY_PREFIX}/({NAME_PATTERN})/({NAME_PATTERN})$"
))
.unwrap();
}
pub struct FlowNameKey<'a>(FlowScoped<FlowNameKeyInner<'a>>);
pub type FlowNameDecodeResult = Result<Option<DeserializedValueWithBytes<FlowNameValue>>>;
#[allow(dead_code)]
impl<'a> FlowNameKey<'a> {
pub fn new(catalog: &'a str, flow_name: &'a str) -> FlowNameKey<'a> {
let inner = FlowNameKeyInner::new(catalog, flow_name);
FlowNameKey(FlowScoped::new(inner))
}
pub fn range_start_key(catalog: &str) -> Vec<u8> {
let inner = BytesAdapter::from(Self::prefix(catalog).into_bytes());
FlowScoped::new(inner).to_bytes()
}
pub fn prefix(catalog: &str) -> String {
format!("{}/{}/", FLOW_NAME_KEY_PREFIX, catalog)
}
pub fn catalog(&self) -> &str {
self.0.catalog_name
}
pub fn flow_name(&self) -> &str {
self.0.flow_name
}
}
impl<'a> MetadataKey<'a, FlowNameKey<'a>> for FlowNameKey<'a> {
fn to_bytes(&self) -> Vec<u8> {
self.0.to_bytes()
}
fn from_bytes(bytes: &'a [u8]) -> Result<FlowNameKey<'a>> {
Ok(FlowNameKey(FlowScoped::<FlowNameKeyInner>::from_bytes(
bytes,
)?))
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct FlowNameKeyInner<'a> {
pub catalog_name: &'a str,
pub flow_name: &'a str,
}
impl<'a> MetadataKey<'a, FlowNameKeyInner<'a>> for FlowNameKeyInner<'_> {
fn to_bytes(&self) -> Vec<u8> {
format!(
"{FLOW_NAME_KEY_PREFIX}/{}/{}",
self.catalog_name, self.flow_name
)
.into_bytes()
}
fn from_bytes(bytes: &'a [u8]) -> Result<FlowNameKeyInner<'a>> {
let key = std::str::from_utf8(bytes).map_err(|e| {
error::InvalidMetadataSnafu {
err_msg: format!(
"FlowNameKeyInner '{}' is not a valid UTF8 string: {e}",
String::from_utf8_lossy(bytes)
),
}
.build()
})?;
let captures =
FLOW_NAME_KEY_PATTERN
.captures(key)
.context(error::InvalidMetadataSnafu {
err_msg: format!("Invalid FlowNameKeyInner '{key}'"),
})?;
let catalog_name = captures.get(1).unwrap().as_str();
let flow_name = captures.get(2).unwrap().as_str();
Ok(FlowNameKeyInner {
catalog_name,
flow_name,
})
}
}
impl<'a> FlowNameKeyInner<'a> {
pub fn new(catalog_name: &'a str, flow_name: &'a str) -> Self {
Self {
catalog_name,
flow_name,
}
}
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
pub struct FlowNameValue {
flow_id: FlowId,
}
impl FlowNameValue {
pub fn new(flow_id: FlowId) -> Self {
Self { flow_id }
}
pub fn flow_id(&self) -> FlowId {
self.flow_id
}
}
pub fn flow_name_decoder(kv: KeyValue) -> Result<(String, FlowNameValue)> {
let flow_name = FlowNameKey::from_bytes(&kv.key)?;
let flow_id = FlowNameValue::try_from_raw_value(&kv.value)?;
Ok((flow_name.flow_name().to_string(), flow_id))
}
pub struct FlowNameManager {
kv_backend: KvBackendRef,
}
impl FlowNameManager {
pub fn new(kv_backend: KvBackendRef) -> Self {
Self { kv_backend }
}
pub async fn get(&self, catalog: &str, flow: &str) -> Result<Option<FlowNameValue>> {
let key = FlowNameKey::new(catalog, flow);
let raw_key = key.to_bytes();
self.kv_backend
.get(&raw_key)
.await?
.map(|x| FlowNameValue::try_from_raw_value(&x.value))
.transpose()
}
pub async fn flow_names(
&self,
catalog: &str,
) -> BoxStream<'static, Result<(String, FlowNameValue)>> {
let start_key = FlowNameKey::range_start_key(catalog);
common_telemetry::debug!("flow_names: start_key: {:?}", start_key);
let req = RangeRequest::new().with_prefix(start_key);
let stream = PaginationStream::new(
self.kv_backend.clone(),
req,
DEFAULT_PAGE_SIZE,
flow_name_decoder,
)
.into_stream();
Box::pin(stream)
}
pub async fn exists(&self, catalog: &str, flow: &str) -> Result<bool> {
let key = FlowNameKey::new(catalog, flow);
let raw_key = key.to_bytes();
self.kv_backend.exists(&raw_key).await
}
pub fn build_create_txn(
&self,
catalog_name: &str,
flow_name: &str,
flow_id: FlowId,
) -> Result<(
Txn,
impl FnOnce(&mut TxnOpGetResponseSet) -> FlowNameDecodeResult,
)> {
let key = FlowNameKey::new(catalog_name, flow_name);
let raw_key = key.to_bytes();
let flow_flow_name_value = FlowNameValue::new(flow_id);
let txn = Txn::put_if_not_exists(raw_key.clone(), flow_flow_name_value.try_as_raw_value()?);
Ok((
txn,
TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(raw_key)),
))
}
pub fn build_update_txn(
&self,
catalog_name: &str,
flow_name: &str,
flow_id: FlowId,
) -> Result<(
Txn,
impl FnOnce(&mut TxnOpGetResponseSet) -> FlowNameDecodeResult,
)> {
let key = FlowNameKey::new(catalog_name, flow_name);
let raw_key = key.to_bytes();
let flow_flow_name_value = FlowNameValue::new(flow_id);
let raw_value = flow_flow_name_value.try_as_raw_value()?;
let txn = Txn::new()
.when(vec![Compare::new(
raw_key.clone(),
CompareOp::Equal,
Some(raw_value),
)])
.or_else(vec![TxnOp::Get(raw_key.clone())]);
Ok((
txn,
TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(raw_key)),
))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_key_serialization() {
let key = FlowNameKey::new("my_catalog", "my_task");
assert_eq!(b"__flow/name/my_catalog/my_task".to_vec(), key.to_bytes(),);
}
#[test]
fn test_key_deserialization() {
let bytes = b"__flow/name/my_catalog/my_task".to_vec();
let key = FlowNameKey::from_bytes(&bytes).unwrap();
assert_eq!(key.catalog(), "my_catalog");
assert_eq!(key.flow_name(), "my_task");
}
#[test]
fn test_key_start_range() {
assert_eq!(
b"__flow/name/greptime/".to_vec(),
FlowNameKey::range_start_key("greptime")
);
}
}