common_meta/key/flow/
flow_info.rs1use std::collections::{BTreeMap, HashMap};
16use std::sync::Arc;
17
18use chrono::{DateTime, Utc};
19use lazy_static::lazy_static;
20use regex::Regex;
21use serde::{Deserialize, Serialize};
22use snafu::OptionExt;
23use table::metadata::TableId;
24use table::table_name::TableName;
25
26use crate::error::{self, Result};
27use crate::key::flow::FlowScoped;
28use crate::key::txn_helper::TxnOpGetResponseSet;
29use crate::key::{DeserializedValueWithBytes, FlowId, FlowPartitionId, MetadataKey, MetadataValue};
30use crate::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp};
31use crate::kv_backend::KvBackendRef;
32use crate::FlownodeId;
33
34const FLOW_INFO_KEY_PREFIX: &str = "info";
35
36lazy_static! {
37 static ref FLOW_INFO_KEY_PATTERN: Regex =
38 Regex::new(&format!("^{FLOW_INFO_KEY_PREFIX}/([0-9]+)$")).unwrap();
39}
40
41pub struct FlowInfoKey(FlowScoped<FlowInfoKeyInner>);
45
46pub type FlowInfoDecodeResult = Result<Option<DeserializedValueWithBytes<FlowInfoValue>>>;
47
48impl<'a> MetadataKey<'a, FlowInfoKey> for FlowInfoKey {
49 fn to_bytes(&self) -> Vec<u8> {
50 self.0.to_bytes()
51 }
52
53 fn from_bytes(bytes: &'a [u8]) -> Result<FlowInfoKey> {
54 Ok(FlowInfoKey(FlowScoped::<FlowInfoKeyInner>::from_bytes(
55 bytes,
56 )?))
57 }
58}
59
60impl FlowInfoKey {
61 pub fn new(flow_id: FlowId) -> FlowInfoKey {
63 let inner = FlowInfoKeyInner::new(flow_id);
64 FlowInfoKey(FlowScoped::new(inner))
65 }
66
67 pub fn flow_id(&self) -> FlowId {
69 self.0.flow_id
70 }
71}
72
73#[derive(Debug, Clone, Copy, PartialEq)]
75struct FlowInfoKeyInner {
76 flow_id: FlowId,
77}
78
79impl FlowInfoKeyInner {
80 pub fn new(flow_id: FlowId) -> FlowInfoKeyInner {
82 FlowInfoKeyInner { flow_id }
83 }
84}
85
86impl<'a> MetadataKey<'a, FlowInfoKeyInner> for FlowInfoKeyInner {
87 fn to_bytes(&self) -> Vec<u8> {
88 format!("{FLOW_INFO_KEY_PREFIX}/{}", self.flow_id).into_bytes()
89 }
90
91 fn from_bytes(bytes: &'a [u8]) -> Result<FlowInfoKeyInner> {
92 let key = std::str::from_utf8(bytes).map_err(|e| {
93 error::InvalidMetadataSnafu {
94 err_msg: format!(
95 "FlowInfoKeyInner '{}' is not a valid UTF8 string: {e}",
96 String::from_utf8_lossy(bytes)
97 ),
98 }
99 .build()
100 })?;
101 let captures =
102 FLOW_INFO_KEY_PATTERN
103 .captures(key)
104 .context(error::InvalidMetadataSnafu {
105 err_msg: format!("Invalid FlowInfoKeyInner '{key}'"),
106 })?;
107 let flow_id = captures[1].parse::<FlowId>().unwrap();
109 Ok(FlowInfoKeyInner { flow_id })
110 }
111}
112
113#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
115pub struct FlowInfoValue {
116 pub(crate) source_table_ids: Vec<TableId>,
118 pub(crate) sink_table_name: TableName,
120 pub(crate) flownode_ids: BTreeMap<FlowPartitionId, FlownodeId>,
122 pub(crate) catalog_name: String,
124 #[serde(default)]
130 pub(crate) query_context: Option<crate::rpc::ddl::QueryContext>,
131 pub(crate) flow_name: String,
133 pub(crate) raw_sql: String,
135 pub(crate) expire_after: Option<i64>,
138 pub(crate) comment: String,
140 pub(crate) options: HashMap<String, String>,
142 #[serde(default)]
144 pub(crate) created_time: DateTime<Utc>,
145 #[serde(default)]
147 pub(crate) updated_time: DateTime<Utc>,
148}
149
150impl FlowInfoValue {
151 pub fn flownode_ids(&self) -> &BTreeMap<FlowPartitionId, FlownodeId> {
153 &self.flownode_ids
154 }
155
156 pub fn source_table_ids(&self) -> &[TableId] {
158 &self.source_table_ids
159 }
160
161 pub fn catalog_name(&self) -> &String {
162 &self.catalog_name
163 }
164
165 pub fn query_context(&self) -> &Option<crate::rpc::ddl::QueryContext> {
166 &self.query_context
167 }
168
169 pub fn flow_name(&self) -> &String {
170 &self.flow_name
171 }
172
173 pub fn sink_table_name(&self) -> &TableName {
174 &self.sink_table_name
175 }
176
177 pub fn raw_sql(&self) -> &String {
178 &self.raw_sql
179 }
180
181 pub fn expire_after(&self) -> Option<i64> {
182 self.expire_after
183 }
184
185 pub fn comment(&self) -> &String {
186 &self.comment
187 }
188
189 pub fn options(&self) -> &HashMap<String, String> {
190 &self.options
191 }
192
193 pub fn created_time(&self) -> &DateTime<Utc> {
194 &self.created_time
195 }
196
197 pub fn updated_time(&self) -> &DateTime<Utc> {
198 &self.updated_time
199 }
200}
201
202pub type FlowInfoManagerRef = Arc<FlowInfoManager>;
203
204pub struct FlowInfoManager {
206 kv_backend: KvBackendRef,
207}
208
209impl FlowInfoManager {
210 pub fn new(kv_backend: KvBackendRef) -> Self {
212 Self { kv_backend }
213 }
214
215 pub async fn get(&self, flow_id: FlowId) -> Result<Option<FlowInfoValue>> {
217 let key = FlowInfoKey::new(flow_id).to_bytes();
218 self.kv_backend
219 .get(&key)
220 .await?
221 .map(|x| FlowInfoValue::try_from_raw_value(&x.value))
222 .transpose()
223 }
224
225 pub async fn get_raw(
227 &self,
228 flow_id: FlowId,
229 ) -> Result<Option<DeserializedValueWithBytes<FlowInfoValue>>> {
230 let key = FlowInfoKey::new(flow_id).to_bytes();
231 self.kv_backend
232 .get(&key)
233 .await?
234 .map(|x| DeserializedValueWithBytes::from_inner_slice(&x.value))
235 .transpose()
236 }
237
238 pub(crate) fn build_create_txn(
242 &self,
243 flow_id: FlowId,
244 flow_value: &FlowInfoValue,
245 ) -> Result<(
246 Txn,
247 impl FnOnce(&mut TxnOpGetResponseSet) -> FlowInfoDecodeResult,
248 )> {
249 let key = FlowInfoKey::new(flow_id).to_bytes();
250 let txn = Txn::put_if_not_exists(key.clone(), flow_value.try_as_raw_value()?);
251
252 Ok((
253 txn,
254 TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(key)),
255 ))
256 }
257
258 pub(crate) fn build_update_txn(
263 &self,
264 flow_id: FlowId,
265 current_flow_value: &DeserializedValueWithBytes<FlowInfoValue>,
266 new_flow_value: &FlowInfoValue,
267 ) -> Result<(
268 Txn,
269 impl FnOnce(&mut TxnOpGetResponseSet) -> FlowInfoDecodeResult,
270 )> {
271 let key = FlowInfoKey::new(flow_id).to_bytes();
272 let raw_value = new_flow_value.try_as_raw_value()?;
273 let prev_value = current_flow_value.get_raw_bytes();
274 let txn = Txn::new()
275 .when(vec![
276 Compare::new(key.clone(), CompareOp::NotEqual, None),
277 Compare::new(key.clone(), CompareOp::Equal, Some(prev_value)),
278 ])
279 .and_then(vec![TxnOp::Put(key.clone(), raw_value)])
280 .or_else(vec![TxnOp::Get(key.clone())]);
281
282 Ok((
283 txn,
284 TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(key)),
285 ))
286 }
287}
288
289#[cfg(test)]
290mod tests {
291 use super::*;
292
293 #[test]
294 fn test_key_serialization() {
295 let flow_info = FlowInfoKey::new(2);
296 assert_eq!(b"__flow/info/2".to_vec(), flow_info.to_bytes());
297 }
298
299 #[test]
300 fn test_key_deserialization() {
301 let bytes = b"__flow/info/2".to_vec();
302 let key = FlowInfoKey::from_bytes(&bytes).unwrap();
303 assert_eq!(key.flow_id(), 2);
304 }
305}