common_meta/key/flow/
flow_info.rs1use std::collections::{BTreeMap, HashMap};
16use std::sync::Arc;
17
18use chrono::{DateTime, Utc};
19use lazy_static::lazy_static;
20use regex::Regex;
21use serde::{Deserialize, Serialize};
22use snafu::OptionExt;
23use table::metadata::TableId;
24use table::table_name::TableName;
25
26use crate::FlownodeId;
27use crate::error::{self, Result};
28use crate::key::flow::FlowScoped;
29use crate::key::txn_helper::TxnOpGetResponseSet;
30use crate::key::{DeserializedValueWithBytes, FlowId, FlowPartitionId, MetadataKey, MetadataValue};
31use crate::kv_backend::KvBackendRef;
32use crate::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp};
33
34const FLOW_INFO_KEY_PREFIX: &str = "info";
35
36lazy_static! {
37 static ref FLOW_INFO_KEY_PATTERN: Regex =
38 Regex::new(&format!("^{FLOW_INFO_KEY_PREFIX}/([0-9]+)$")).unwrap();
39}
40
41pub struct FlowInfoKey(FlowScoped<FlowInfoKeyInner>);
45
46pub type FlowInfoDecodeResult = Result<Option<DeserializedValueWithBytes<FlowInfoValue>>>;
47
48impl<'a> MetadataKey<'a, FlowInfoKey> for FlowInfoKey {
49 fn to_bytes(&self) -> Vec<u8> {
50 self.0.to_bytes()
51 }
52
53 fn from_bytes(bytes: &'a [u8]) -> Result<FlowInfoKey> {
54 Ok(FlowInfoKey(FlowScoped::<FlowInfoKeyInner>::from_bytes(
55 bytes,
56 )?))
57 }
58}
59
60impl FlowInfoKey {
61 pub fn new(flow_id: FlowId) -> FlowInfoKey {
63 let inner = FlowInfoKeyInner::new(flow_id);
64 FlowInfoKey(FlowScoped::new(inner))
65 }
66
67 pub fn flow_id(&self) -> FlowId {
69 self.0.flow_id
70 }
71}
72
73#[derive(Debug, Clone, Copy, PartialEq)]
75struct FlowInfoKeyInner {
76 flow_id: FlowId,
77}
78
79impl FlowInfoKeyInner {
80 pub fn new(flow_id: FlowId) -> FlowInfoKeyInner {
82 FlowInfoKeyInner { flow_id }
83 }
84}
85
86impl<'a> MetadataKey<'a, FlowInfoKeyInner> for FlowInfoKeyInner {
87 fn to_bytes(&self) -> Vec<u8> {
88 format!("{FLOW_INFO_KEY_PREFIX}/{}", self.flow_id).into_bytes()
89 }
90
91 fn from_bytes(bytes: &'a [u8]) -> Result<FlowInfoKeyInner> {
92 let key = std::str::from_utf8(bytes).map_err(|e| {
93 error::InvalidMetadataSnafu {
94 err_msg: format!(
95 "FlowInfoKeyInner '{}' is not a valid UTF8 string: {e}",
96 String::from_utf8_lossy(bytes)
97 ),
98 }
99 .build()
100 })?;
101 let captures =
102 FLOW_INFO_KEY_PATTERN
103 .captures(key)
104 .context(error::InvalidMetadataSnafu {
105 err_msg: format!("Invalid FlowInfoKeyInner '{key}'"),
106 })?;
107 let flow_id = captures[1].parse::<FlowId>().unwrap();
109 Ok(FlowInfoKeyInner { flow_id })
110 }
111}
112
113#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
115pub struct FlowInfoValue {
116 pub source_table_ids: Vec<TableId>,
118 pub sink_table_name: TableName,
120 pub flownode_ids: BTreeMap<FlowPartitionId, FlownodeId>,
122 pub catalog_name: String,
124 #[serde(default)]
130 pub query_context: Option<crate::rpc::ddl::QueryContext>,
131 pub flow_name: String,
133 pub raw_sql: String,
135 pub expire_after: Option<i64>,
138 #[serde(default)]
143 pub eval_interval_secs: Option<i64>,
144 pub comment: String,
146 pub options: HashMap<String, String>,
148 #[serde(default)]
150 pub created_time: DateTime<Utc>,
151 #[serde(default)]
153 pub updated_time: DateTime<Utc>,
154}
155
156impl FlowInfoValue {
157 pub fn flownode_ids(&self) -> &BTreeMap<FlowPartitionId, FlownodeId> {
159 &self.flownode_ids
160 }
161
162 pub fn insert_flownode_id(
164 &mut self,
165 partition: FlowPartitionId,
166 node: FlownodeId,
167 ) -> Option<FlownodeId> {
168 self.flownode_ids.insert(partition, node)
169 }
170
171 pub fn source_table_ids(&self) -> &[TableId] {
173 &self.source_table_ids
174 }
175
176 pub fn catalog_name(&self) -> &String {
177 &self.catalog_name
178 }
179
180 pub fn query_context(&self) -> &Option<crate::rpc::ddl::QueryContext> {
181 &self.query_context
182 }
183
184 pub fn flow_name(&self) -> &String {
185 &self.flow_name
186 }
187
188 pub fn sink_table_name(&self) -> &TableName {
189 &self.sink_table_name
190 }
191
192 pub fn raw_sql(&self) -> &String {
193 &self.raw_sql
194 }
195
196 pub fn expire_after(&self) -> Option<i64> {
197 self.expire_after
198 }
199
200 pub fn eval_interval(&self) -> Option<i64> {
201 self.eval_interval_secs
202 }
203
204 pub fn comment(&self) -> &String {
205 &self.comment
206 }
207
208 pub fn options(&self) -> &HashMap<String, String> {
209 &self.options
210 }
211
212 pub fn created_time(&self) -> &DateTime<Utc> {
213 &self.created_time
214 }
215
216 pub fn updated_time(&self) -> &DateTime<Utc> {
217 &self.updated_time
218 }
219}
220
221pub type FlowInfoManagerRef = Arc<FlowInfoManager>;
222
223pub struct FlowInfoManager {
225 kv_backend: KvBackendRef,
226}
227
228impl FlowInfoManager {
229 pub fn new(kv_backend: KvBackendRef) -> Self {
231 Self { kv_backend }
232 }
233
234 pub async fn get(&self, flow_id: FlowId) -> Result<Option<FlowInfoValue>> {
236 let key = FlowInfoKey::new(flow_id).to_bytes();
237 self.kv_backend
238 .get(&key)
239 .await?
240 .map(|x| FlowInfoValue::try_from_raw_value(&x.value))
241 .transpose()
242 }
243
244 pub async fn get_raw(
246 &self,
247 flow_id: FlowId,
248 ) -> Result<Option<DeserializedValueWithBytes<FlowInfoValue>>> {
249 let key = FlowInfoKey::new(flow_id).to_bytes();
250 self.kv_backend
251 .get(&key)
252 .await?
253 .map(|x| DeserializedValueWithBytes::from_inner_slice(&x.value))
254 .transpose()
255 }
256
257 pub(crate) fn build_create_txn(
261 &self,
262 flow_id: FlowId,
263 flow_value: &FlowInfoValue,
264 ) -> Result<(
265 Txn,
266 impl FnOnce(&mut TxnOpGetResponseSet) -> FlowInfoDecodeResult,
267 )> {
268 let key = FlowInfoKey::new(flow_id).to_bytes();
269 let txn = Txn::put_if_not_exists(key.clone(), flow_value.try_as_raw_value()?);
270
271 Ok((
272 txn,
273 TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(key)),
274 ))
275 }
276
277 pub(crate) fn build_update_txn(
282 &self,
283 flow_id: FlowId,
284 current_flow_value: &DeserializedValueWithBytes<FlowInfoValue>,
285 new_flow_value: &FlowInfoValue,
286 ) -> Result<(
287 Txn,
288 impl FnOnce(&mut TxnOpGetResponseSet) -> FlowInfoDecodeResult,
289 )> {
290 let key = FlowInfoKey::new(flow_id).to_bytes();
291 let raw_value = new_flow_value.try_as_raw_value()?;
292 let prev_value = current_flow_value.get_raw_bytes();
293 let txn = Txn::new()
294 .when(vec![Compare::new(
295 key.clone(),
296 CompareOp::Equal,
297 Some(prev_value),
298 )])
299 .and_then(vec![TxnOp::Put(key.clone(), raw_value)])
300 .or_else(vec![TxnOp::Get(key.clone())]);
301
302 Ok((
303 txn,
304 TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(key)),
305 ))
306 }
307}
308
309#[cfg(test)]
310mod tests {
311 use super::*;
312
313 #[test]
314 fn test_key_serialization() {
315 let flow_info = FlowInfoKey::new(2);
316 assert_eq!(b"__flow/info/2".to_vec(), flow_info.to_bytes());
317 }
318
319 #[test]
320 fn test_key_deserialization() {
321 let bytes = b"__flow/info/2".to_vec();
322 let key = FlowInfoKey::from_bytes(&bytes).unwrap();
323 assert_eq!(key.flow_id(), 2);
324 }
325}