common_meta/key/flow/
flow_info.rs1use std::collections::{BTreeMap, HashMap};
16use std::sync::Arc;
17
18use chrono::{DateTime, Utc};
19use lazy_static::lazy_static;
20use regex::Regex;
21use serde::{Deserialize, Serialize};
22use snafu::OptionExt;
23use table::metadata::TableId;
24use table::table_name::TableName;
25
26use crate::error::{self, Result};
27use crate::key::flow::FlowScoped;
28use crate::key::txn_helper::TxnOpGetResponseSet;
29use crate::key::{DeserializedValueWithBytes, FlowId, FlowPartitionId, MetadataKey, MetadataValue};
30use crate::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp};
31use crate::kv_backend::KvBackendRef;
32use crate::FlownodeId;
33
34const FLOW_INFO_KEY_PREFIX: &str = "info";
35
36lazy_static! {
37 static ref FLOW_INFO_KEY_PATTERN: Regex =
38 Regex::new(&format!("^{FLOW_INFO_KEY_PREFIX}/([0-9]+)$")).unwrap();
39}
40
41pub struct FlowInfoKey(FlowScoped<FlowInfoKeyInner>);
45
46pub type FlowInfoDecodeResult = Result<Option<DeserializedValueWithBytes<FlowInfoValue>>>;
47
48impl<'a> MetadataKey<'a, FlowInfoKey> for FlowInfoKey {
49 fn to_bytes(&self) -> Vec<u8> {
50 self.0.to_bytes()
51 }
52
53 fn from_bytes(bytes: &'a [u8]) -> Result<FlowInfoKey> {
54 Ok(FlowInfoKey(FlowScoped::<FlowInfoKeyInner>::from_bytes(
55 bytes,
56 )?))
57 }
58}
59
60impl FlowInfoKey {
61 pub fn new(flow_id: FlowId) -> FlowInfoKey {
63 let inner = FlowInfoKeyInner::new(flow_id);
64 FlowInfoKey(FlowScoped::new(inner))
65 }
66
67 pub fn flow_id(&self) -> FlowId {
69 self.0.flow_id
70 }
71}
72
73#[derive(Debug, Clone, Copy, PartialEq)]
75struct FlowInfoKeyInner {
76 flow_id: FlowId,
77}
78
79impl FlowInfoKeyInner {
80 pub fn new(flow_id: FlowId) -> FlowInfoKeyInner {
82 FlowInfoKeyInner { flow_id }
83 }
84}
85
86impl<'a> MetadataKey<'a, FlowInfoKeyInner> for FlowInfoKeyInner {
87 fn to_bytes(&self) -> Vec<u8> {
88 format!("{FLOW_INFO_KEY_PREFIX}/{}", self.flow_id).into_bytes()
89 }
90
91 fn from_bytes(bytes: &'a [u8]) -> Result<FlowInfoKeyInner> {
92 let key = std::str::from_utf8(bytes).map_err(|e| {
93 error::InvalidMetadataSnafu {
94 err_msg: format!(
95 "FlowInfoKeyInner '{}' is not a valid UTF8 string: {e}",
96 String::from_utf8_lossy(bytes)
97 ),
98 }
99 .build()
100 })?;
101 let captures =
102 FLOW_INFO_KEY_PATTERN
103 .captures(key)
104 .context(error::InvalidMetadataSnafu {
105 err_msg: format!("Invalid FlowInfoKeyInner '{key}'"),
106 })?;
107 let flow_id = captures[1].parse::<FlowId>().unwrap();
109 Ok(FlowInfoKeyInner { flow_id })
110 }
111}
112
113#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
115pub struct FlowInfoValue {
116 pub source_table_ids: Vec<TableId>,
118 pub sink_table_name: TableName,
120 pub flownode_ids: BTreeMap<FlowPartitionId, FlownodeId>,
122 pub catalog_name: String,
124 #[serde(default)]
130 pub query_context: Option<crate::rpc::ddl::QueryContext>,
131 pub flow_name: String,
133 pub raw_sql: String,
135 pub expire_after: Option<i64>,
138 pub comment: String,
140 pub options: HashMap<String, String>,
142 #[serde(default)]
144 pub created_time: DateTime<Utc>,
145 #[serde(default)]
147 pub updated_time: DateTime<Utc>,
148}
149
150impl FlowInfoValue {
151 pub fn flownode_ids(&self) -> &BTreeMap<FlowPartitionId, FlownodeId> {
153 &self.flownode_ids
154 }
155
156 pub fn insert_flownode_id(
158 &mut self,
159 partition: FlowPartitionId,
160 node: FlownodeId,
161 ) -> Option<FlownodeId> {
162 self.flownode_ids.insert(partition, node)
163 }
164
165 pub fn source_table_ids(&self) -> &[TableId] {
167 &self.source_table_ids
168 }
169
170 pub fn catalog_name(&self) -> &String {
171 &self.catalog_name
172 }
173
174 pub fn query_context(&self) -> &Option<crate::rpc::ddl::QueryContext> {
175 &self.query_context
176 }
177
178 pub fn flow_name(&self) -> &String {
179 &self.flow_name
180 }
181
182 pub fn sink_table_name(&self) -> &TableName {
183 &self.sink_table_name
184 }
185
186 pub fn raw_sql(&self) -> &String {
187 &self.raw_sql
188 }
189
190 pub fn expire_after(&self) -> Option<i64> {
191 self.expire_after
192 }
193
194 pub fn comment(&self) -> &String {
195 &self.comment
196 }
197
198 pub fn options(&self) -> &HashMap<String, String> {
199 &self.options
200 }
201
202 pub fn created_time(&self) -> &DateTime<Utc> {
203 &self.created_time
204 }
205
206 pub fn updated_time(&self) -> &DateTime<Utc> {
207 &self.updated_time
208 }
209}
210
211pub type FlowInfoManagerRef = Arc<FlowInfoManager>;
212
213pub struct FlowInfoManager {
215 kv_backend: KvBackendRef,
216}
217
218impl FlowInfoManager {
219 pub fn new(kv_backend: KvBackendRef) -> Self {
221 Self { kv_backend }
222 }
223
224 pub async fn get(&self, flow_id: FlowId) -> Result<Option<FlowInfoValue>> {
226 let key = FlowInfoKey::new(flow_id).to_bytes();
227 self.kv_backend
228 .get(&key)
229 .await?
230 .map(|x| FlowInfoValue::try_from_raw_value(&x.value))
231 .transpose()
232 }
233
234 pub async fn get_raw(
236 &self,
237 flow_id: FlowId,
238 ) -> Result<Option<DeserializedValueWithBytes<FlowInfoValue>>> {
239 let key = FlowInfoKey::new(flow_id).to_bytes();
240 self.kv_backend
241 .get(&key)
242 .await?
243 .map(|x| DeserializedValueWithBytes::from_inner_slice(&x.value))
244 .transpose()
245 }
246
247 pub(crate) fn build_create_txn(
251 &self,
252 flow_id: FlowId,
253 flow_value: &FlowInfoValue,
254 ) -> Result<(
255 Txn,
256 impl FnOnce(&mut TxnOpGetResponseSet) -> FlowInfoDecodeResult,
257 )> {
258 let key = FlowInfoKey::new(flow_id).to_bytes();
259 let txn = Txn::put_if_not_exists(key.clone(), flow_value.try_as_raw_value()?);
260
261 Ok((
262 txn,
263 TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(key)),
264 ))
265 }
266
267 pub(crate) fn build_update_txn(
272 &self,
273 flow_id: FlowId,
274 current_flow_value: &DeserializedValueWithBytes<FlowInfoValue>,
275 new_flow_value: &FlowInfoValue,
276 ) -> Result<(
277 Txn,
278 impl FnOnce(&mut TxnOpGetResponseSet) -> FlowInfoDecodeResult,
279 )> {
280 let key = FlowInfoKey::new(flow_id).to_bytes();
281 let raw_value = new_flow_value.try_as_raw_value()?;
282 let prev_value = current_flow_value.get_raw_bytes();
283 let txn = Txn::new()
284 .when(vec![Compare::new(
285 key.clone(),
286 CompareOp::Equal,
287 Some(prev_value),
288 )])
289 .and_then(vec![TxnOp::Put(key.clone(), raw_value)])
290 .or_else(vec![TxnOp::Get(key.clone())]);
291
292 Ok((
293 txn,
294 TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(key)),
295 ))
296 }
297}
298
299#[cfg(test)]
300mod tests {
301 use super::*;
302
303 #[test]
304 fn test_key_serialization() {
305 let flow_info = FlowInfoKey::new(2);
306 assert_eq!(b"__flow/info/2".to_vec(), flow_info.to_bytes());
307 }
308
309 #[test]
310 fn test_key_deserialization() {
311 let bytes = b"__flow/info/2".to_vec();
312 let key = FlowInfoKey::from_bytes(&bytes).unwrap();
313 assert_eq!(key.flow_id(), 2);
314 }
315}