1use common_telemetry::warn;
16use futures::stream::BoxStream;
17use lazy_static::lazy_static;
18use regex::Regex;
19use serde::{Deserialize, Serialize};
20use snafu::OptionExt;
21
22use crate::error::{self, Result};
23use crate::key::flow::FlowScoped;
24use crate::key::txn_helper::TxnOpGetResponseSet;
25use crate::key::{
26 BytesAdapter, DeserializedValueWithBytes, FlowId, MetadataKey, MetadataValue, NAME_PATTERN,
27};
28use crate::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp};
29use crate::kv_backend::KvBackendRef;
30use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE};
31use crate::rpc::store::RangeRequest;
32use crate::rpc::KeyValue;
33
34const FLOW_NAME_KEY_PREFIX: &str = "name";
35
36lazy_static! {
37 static ref FLOW_NAME_KEY_PATTERN: Regex = Regex::new(&format!(
38 "^{FLOW_NAME_KEY_PREFIX}/({NAME_PATTERN})/({NAME_PATTERN})$"
39 ))
40 .unwrap();
41
42 static ref COMPAT_FLOW_NAME_KEY_PATTERN: Regex = Regex::new(&format!(
44 "^{FLOW_NAME_KEY_PREFIX}/({NAME_PATTERN})/(.*)$"
45 ))
46 .unwrap();
47}
48
49pub struct FlowNameKey<'a>(FlowScoped<FlowNameKeyInner<'a>>);
53
54pub type FlowNameDecodeResult = Result<Option<DeserializedValueWithBytes<FlowNameValue>>>;
55
56#[allow(dead_code)]
57impl<'a> FlowNameKey<'a> {
58 pub fn new(catalog: &'a str, flow_name: &'a str) -> FlowNameKey<'a> {
60 let inner = FlowNameKeyInner::new(catalog, flow_name);
61 FlowNameKey(FlowScoped::new(inner))
62 }
63
64 pub fn range_start_key(catalog: &str) -> Vec<u8> {
65 let inner = BytesAdapter::from(Self::prefix(catalog).into_bytes());
66
67 FlowScoped::new(inner).to_bytes()
68 }
69
70 pub fn prefix(catalog: &str) -> String {
72 format!("{}/{}/", FLOW_NAME_KEY_PREFIX, catalog)
73 }
74
75 pub fn catalog(&self) -> &str {
77 self.0.catalog_name
78 }
79
80 pub fn flow_name(&self) -> &str {
82 self.0.flow_name
83 }
84}
85
86impl<'a> MetadataKey<'a, FlowNameKey<'a>> for FlowNameKey<'a> {
87 fn to_bytes(&self) -> Vec<u8> {
88 self.0.to_bytes()
89 }
90
91 fn from_bytes(bytes: &'a [u8]) -> Result<FlowNameKey<'a>> {
92 Ok(FlowNameKey(FlowScoped::<FlowNameKeyInner>::from_bytes(
93 bytes,
94 )?))
95 }
96}
97
98#[derive(Debug, Clone, PartialEq, Eq)]
100pub struct FlowNameKeyInner<'a> {
101 pub catalog_name: &'a str,
102 pub flow_name: &'a str,
103}
104
105impl<'a> MetadataKey<'a, FlowNameKeyInner<'a>> for FlowNameKeyInner<'_> {
106 fn to_bytes(&self) -> Vec<u8> {
107 format!(
108 "{FLOW_NAME_KEY_PREFIX}/{}/{}",
109 self.catalog_name, self.flow_name
110 )
111 .into_bytes()
112 }
113
114 fn from_bytes(bytes: &'a [u8]) -> Result<FlowNameKeyInner<'a>> {
115 let key = std::str::from_utf8(bytes).map_err(|e| {
116 error::InvalidMetadataSnafu {
117 err_msg: format!(
118 "FlowNameKeyInner '{}' is not a valid UTF8 string: {e}",
119 String::from_utf8_lossy(bytes)
120 ),
121 }
122 .build()
123 })?;
124 let captures = FLOW_NAME_KEY_PATTERN
125 .captures(key)
126 .or_else(|| {
127 warn!(
128 "FlowNameKeyInner '{}' is not a valid flow name in newer version.",
129 key
130 );
131 COMPAT_FLOW_NAME_KEY_PATTERN.captures(key)
132 })
133 .context(error::InvalidMetadataSnafu {
134 err_msg: format!("Invalid FlowNameKeyInner '{key}'"),
135 })?;
136 let catalog_name = captures.get(1).unwrap().as_str();
138 let flow_name = captures.get(2).unwrap().as_str();
139 Ok(FlowNameKeyInner {
140 catalog_name,
141 flow_name,
142 })
143 }
144}
145
146impl<'a> FlowNameKeyInner<'a> {
147 pub fn new(catalog_name: &'a str, flow_name: &'a str) -> Self {
149 Self {
150 catalog_name,
151 flow_name,
152 }
153 }
154}
155
156#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
158pub struct FlowNameValue {
159 flow_id: FlowId,
160}
161
162impl FlowNameValue {
163 pub fn new(flow_id: FlowId) -> Self {
165 Self { flow_id }
166 }
167
168 pub fn flow_id(&self) -> FlowId {
170 self.flow_id
171 }
172}
173
174pub fn flow_name_decoder(kv: KeyValue) -> Result<(String, FlowNameValue)> {
175 let flow_name = FlowNameKey::from_bytes(&kv.key)?;
176 let flow_id = FlowNameValue::try_from_raw_value(&kv.value)?;
177 Ok((flow_name.flow_name().to_string(), flow_id))
178}
179
180pub struct FlowNameManager {
182 kv_backend: KvBackendRef,
183}
184
185impl FlowNameManager {
186 pub fn new(kv_backend: KvBackendRef) -> Self {
188 Self { kv_backend }
189 }
190
191 pub async fn get(&self, catalog: &str, flow: &str) -> Result<Option<FlowNameValue>> {
193 let key = FlowNameKey::new(catalog, flow);
194 let raw_key = key.to_bytes();
195 self.kv_backend
196 .get(&raw_key)
197 .await?
198 .map(|x| FlowNameValue::try_from_raw_value(&x.value))
199 .transpose()
200 }
201
202 pub async fn flow_names(
204 &self,
205 catalog: &str,
206 ) -> BoxStream<'static, Result<(String, FlowNameValue)>> {
207 let start_key = FlowNameKey::range_start_key(catalog);
208 common_telemetry::debug!("flow_names: start_key: {:?}", start_key);
209 let req = RangeRequest::new().with_prefix(start_key);
210
211 let stream = PaginationStream::new(
212 self.kv_backend.clone(),
213 req,
214 DEFAULT_PAGE_SIZE,
215 flow_name_decoder,
216 )
217 .into_stream();
218
219 Box::pin(stream)
220 }
221
222 pub async fn exists(&self, catalog: &str, flow: &str) -> Result<bool> {
224 let key = FlowNameKey::new(catalog, flow);
225 let raw_key = key.to_bytes();
226 self.kv_backend.exists(&raw_key).await
227 }
228
229 pub fn build_create_txn(
233 &self,
234 catalog_name: &str,
235 flow_name: &str,
236 flow_id: FlowId,
237 ) -> Result<(
238 Txn,
239 impl FnOnce(&mut TxnOpGetResponseSet) -> FlowNameDecodeResult,
240 )> {
241 let key = FlowNameKey::new(catalog_name, flow_name);
242 let raw_key = key.to_bytes();
243 let flow_flow_name_value = FlowNameValue::new(flow_id);
244 let txn = Txn::put_if_not_exists(raw_key.clone(), flow_flow_name_value.try_as_raw_value()?);
245
246 Ok((
247 txn,
248 TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(raw_key)),
249 ))
250 }
251
252 pub fn build_update_txn(
257 &self,
258 catalog_name: &str,
259 flow_name: &str,
260 flow_id: FlowId,
261 ) -> Result<(
262 Txn,
263 impl FnOnce(&mut TxnOpGetResponseSet) -> FlowNameDecodeResult,
264 )> {
265 let key = FlowNameKey::new(catalog_name, flow_name);
266 let raw_key = key.to_bytes();
267 let flow_flow_name_value = FlowNameValue::new(flow_id);
268 let raw_value = flow_flow_name_value.try_as_raw_value()?;
269 let txn = Txn::new()
270 .when(vec![Compare::new(
271 raw_key.clone(),
272 CompareOp::Equal,
273 Some(raw_value),
274 )])
275 .or_else(vec![TxnOp::Get(raw_key.clone())]);
276
277 Ok((
278 txn,
279 TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(raw_key)),
280 ))
281 }
282}
283
284#[cfg(test)]
285mod tests {
286 use super::*;
287
288 #[test]
289 fn test_key_serialization() {
290 let key = FlowNameKey::new("my_catalog", "my_task");
291 assert_eq!(b"__flow/name/my_catalog/my_task".to_vec(), key.to_bytes(),);
292 }
293
294 #[test]
295 fn test_key_deserialization() {
296 let bytes = b"__flow/name/my_catalog/my_task".to_vec();
297 let key = FlowNameKey::from_bytes(&bytes).unwrap();
298 assert_eq!(key.catalog(), "my_catalog");
299 assert_eq!(key.flow_name(), "my_task");
300
301 let bytes = b"__flow/name/my_catalog/a/`b`".to_vec();
303 let key = FlowNameKey::from_bytes(&bytes).unwrap();
304 assert_eq!(key.catalog(), "my_catalog");
305 assert_eq!(key.flow_name(), "a/`b`");
306 }
307 #[test]
308 fn test_key_start_range() {
309 assert_eq!(
310 b"__flow/name/greptime/".to_vec(),
311 FlowNameKey::range_start_key("greptime")
312 );
313 }
314}