common_meta/key/flow/
flownode_flow.rs1use futures::stream::BoxStream;
16use futures::TryStreamExt;
17use lazy_static::lazy_static;
18use regex::Regex;
19use snafu::OptionExt;
20
21use crate::error::{self, Result};
22use crate::key::flow::flow_info::FlowInfoValue;
23use crate::key::flow::FlowScoped;
24use crate::key::{BytesAdapter, DeserializedValueWithBytes, FlowId, FlowPartitionId, MetadataKey};
25use crate::kv_backend::txn::{Txn, TxnOp};
26use crate::kv_backend::KvBackendRef;
27use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE};
28use crate::rpc::store::RangeRequest;
29use crate::rpc::KeyValue;
30use crate::FlownodeId;
31
32lazy_static! {
33 static ref FLOWNODE_FLOW_KEY_PATTERN: Regex = Regex::new(&format!(
34 "^{FLOWNODE_FLOW_KEY_PREFIX}/([0-9]+)/([0-9]+)/([0-9]+)$"
35 ))
36 .unwrap();
37}
38
39const FLOWNODE_FLOW_KEY_PREFIX: &str = "flownode";
40
41pub struct FlownodeFlowKey(FlowScoped<FlownodeFlowKeyInner>);
45
46impl<'a> MetadataKey<'a, FlownodeFlowKey> for FlownodeFlowKey {
47 fn to_bytes(&self) -> Vec<u8> {
48 self.0.to_bytes()
49 }
50
51 fn from_bytes(bytes: &'a [u8]) -> Result<FlownodeFlowKey> {
52 Ok(FlownodeFlowKey(
53 FlowScoped::<FlownodeFlowKeyInner>::from_bytes(bytes)?,
54 ))
55 }
56}
57
58impl FlownodeFlowKey {
59 pub fn new(
61 flownode_id: FlownodeId,
62 flow_id: FlowId,
63 partition_id: FlowPartitionId,
64 ) -> FlownodeFlowKey {
65 let inner = FlownodeFlowKeyInner::new(flownode_id, flow_id, partition_id);
66 FlownodeFlowKey(FlowScoped::new(inner))
67 }
68
69 pub fn range_start_key(flownode_id: FlownodeId) -> Vec<u8> {
71 let inner = BytesAdapter::from(FlownodeFlowKeyInner::prefix(flownode_id).into_bytes());
72
73 FlowScoped::new(inner).to_bytes()
74 }
75
76 pub fn flow_id(&self) -> FlowId {
78 self.0.flow_id
79 }
80
81 #[cfg(test)]
82 pub fn flownode_id(&self) -> FlownodeId {
84 self.0.flownode_id
85 }
86
87 pub fn partition_id(&self) -> FlowPartitionId {
89 self.0.partition_id
90 }
91}
92
93pub struct FlownodeFlowKeyInner {
95 flownode_id: FlownodeId,
96 flow_id: FlowId,
97 partition_id: FlowPartitionId,
98}
99
100impl FlownodeFlowKeyInner {
101 pub fn new(flownode_id: FlownodeId, flow_id: FlowId, partition_id: FlowPartitionId) -> Self {
103 Self {
104 flownode_id,
105 flow_id,
106 partition_id,
107 }
108 }
109
110 pub fn prefix(flownode_id: FlownodeId) -> String {
111 format!("{}/{flownode_id}/", FLOWNODE_FLOW_KEY_PREFIX)
112 }
113}
114
115impl<'a> MetadataKey<'a, FlownodeFlowKeyInner> for FlownodeFlowKeyInner {
116 fn to_bytes(&self) -> Vec<u8> {
117 format!(
118 "{FLOWNODE_FLOW_KEY_PREFIX}/{}/{}/{}",
119 self.flownode_id, self.flow_id, self.partition_id,
120 )
121 .into_bytes()
122 }
123
124 fn from_bytes(bytes: &'a [u8]) -> Result<FlownodeFlowKeyInner> {
125 let key = std::str::from_utf8(bytes).map_err(|e| {
126 error::InvalidMetadataSnafu {
127 err_msg: format!(
128 "FlownodeFlowKeyInner '{}' is not a valid UTF8 string: {e}",
129 String::from_utf8_lossy(bytes)
130 ),
131 }
132 .build()
133 })?;
134 let captures =
135 FLOWNODE_FLOW_KEY_PATTERN
136 .captures(key)
137 .context(error::InvalidMetadataSnafu {
138 err_msg: format!("Invalid FlownodeFlowKeyInner '{key}'"),
139 })?;
140 let flownode_id = captures[1].parse::<FlownodeId>().unwrap();
142 let flow_id = captures[2].parse::<FlowId>().unwrap();
143 let partition_id = captures[3].parse::<FlowPartitionId>().unwrap();
144
145 Ok(FlownodeFlowKeyInner {
146 flownode_id,
147 flow_id,
148 partition_id,
149 })
150 }
151}
152
153pub struct FlownodeFlowManager {
155 kv_backend: KvBackendRef,
156}
157
158pub fn flownode_flow_key_decoder(kv: KeyValue) -> Result<FlownodeFlowKey> {
160 FlownodeFlowKey::from_bytes(&kv.key)
161}
162
163impl FlownodeFlowManager {
164 pub fn new(kv_backend: KvBackendRef) -> Self {
166 Self { kv_backend }
167 }
168
169 pub async fn exists(
171 &self,
172 flownode_id: FlownodeId,
173 flow_id: FlowId,
174 partition_id: FlowPartitionId,
175 ) -> Result<bool> {
176 let key = FlownodeFlowKey::new(flownode_id, flow_id, partition_id).to_bytes();
177 Ok(self.kv_backend.get(&key).await?.is_some())
178 }
179
180 pub fn flows(
182 &self,
183 flownode_id: FlownodeId,
184 ) -> BoxStream<'static, Result<(FlowId, FlowPartitionId)>> {
185 let start_key = FlownodeFlowKey::range_start_key(flownode_id);
186 let req = RangeRequest::new().with_prefix(start_key);
187
188 let stream = PaginationStream::new(
189 self.kv_backend.clone(),
190 req,
191 DEFAULT_PAGE_SIZE,
192 flownode_flow_key_decoder,
193 )
194 .into_stream();
195
196 Box::pin(stream.map_ok(|key| (key.flow_id(), key.partition_id())))
197 }
198
199 pub(crate) fn build_create_txn<I: IntoIterator<Item = (FlowPartitionId, FlownodeId)>>(
203 &self,
204 flow_id: FlowId,
205 flownode_ids: I,
206 ) -> Txn {
207 let txns = flownode_ids
208 .into_iter()
209 .map(|(partition_id, flownode_id)| {
210 let key = FlownodeFlowKey::new(flownode_id, flow_id, partition_id).to_bytes();
211 TxnOp::Put(key, vec![])
212 })
213 .collect::<Vec<_>>();
214
215 Txn::new().and_then(txns)
216 }
217
218 pub(crate) fn build_update_txn<I: IntoIterator<Item = (FlowPartitionId, FlownodeId)>>(
223 &self,
224 flow_id: FlowId,
225 current_flow_info: &DeserializedValueWithBytes<FlowInfoValue>,
226 flownode_ids: I,
227 ) -> Txn {
228 let del_txns =
229 current_flow_info
230 .flownode_ids()
231 .iter()
232 .map(|(partition_id, flownode_id)| {
233 let key = FlownodeFlowKey::new(*flownode_id, flow_id, *partition_id).to_bytes();
234 TxnOp::Delete(key)
235 });
236 let put_txns = flownode_ids.into_iter().map(|(partition_id, flownode_id)| {
237 let key = FlownodeFlowKey::new(flownode_id, flow_id, partition_id).to_bytes();
238 TxnOp::Put(key, vec![])
239 });
240 let txns = del_txns.chain(put_txns).collect::<Vec<_>>();
241
242 Txn::new().and_then(txns)
243 }
244}
245
246#[cfg(test)]
247mod tests {
248 use crate::key::flow::flownode_flow::FlownodeFlowKey;
249 use crate::key::MetadataKey;
250
251 #[test]
252 fn test_key_serialization() {
253 let flownode_flow = FlownodeFlowKey::new(1, 2, 0);
254 assert_eq!(b"__flow/flownode/1/2/0".to_vec(), flownode_flow.to_bytes());
255 let prefix = FlownodeFlowKey::range_start_key(1);
256 assert_eq!(b"__flow/flownode/1/".to_vec(), prefix);
257 }
258
259 #[test]
260 fn test_key_deserialization() {
261 let bytes = b"__flow/flownode/1/2/0".to_vec();
262 let key = FlownodeFlowKey::from_bytes(&bytes).unwrap();
263 assert_eq!(key.flownode_id(), 1);
264 assert_eq!(key.flow_id(), 2);
265 assert_eq!(key.partition_id(), 0);
266 }
267}