common_meta/key/flow/
flownode_flow.rs1use futures::stream::BoxStream;
16use futures::TryStreamExt;
17use lazy_static::lazy_static;
18use regex::Regex;
19use snafu::OptionExt;
20
21use crate::error::{self, Result};
22use crate::key::flow::FlowScoped;
23use crate::key::{BytesAdapter, FlowId, FlowPartitionId, MetadataKey};
24use crate::kv_backend::txn::{Txn, TxnOp};
25use crate::kv_backend::KvBackendRef;
26use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE};
27use crate::rpc::store::RangeRequest;
28use crate::rpc::KeyValue;
29use crate::FlownodeId;
30
31lazy_static! {
32 static ref FLOWNODE_FLOW_KEY_PATTERN: Regex = Regex::new(&format!(
33 "^{FLOWNODE_FLOW_KEY_PREFIX}/([0-9]+)/([0-9]+)/([0-9]+)$"
34 ))
35 .unwrap();
36}
37
38const FLOWNODE_FLOW_KEY_PREFIX: &str = "flownode";
39
40pub struct FlownodeFlowKey(FlowScoped<FlownodeFlowKeyInner>);
44
45impl<'a> MetadataKey<'a, FlownodeFlowKey> for FlownodeFlowKey {
46 fn to_bytes(&self) -> Vec<u8> {
47 self.0.to_bytes()
48 }
49
50 fn from_bytes(bytes: &'a [u8]) -> Result<FlownodeFlowKey> {
51 Ok(FlownodeFlowKey(
52 FlowScoped::<FlownodeFlowKeyInner>::from_bytes(bytes)?,
53 ))
54 }
55}
56
57impl FlownodeFlowKey {
58 pub fn new(
60 flownode_id: FlownodeId,
61 flow_id: FlowId,
62 partition_id: FlowPartitionId,
63 ) -> FlownodeFlowKey {
64 let inner = FlownodeFlowKeyInner::new(flownode_id, flow_id, partition_id);
65 FlownodeFlowKey(FlowScoped::new(inner))
66 }
67
68 pub fn range_start_key(flownode_id: FlownodeId) -> Vec<u8> {
70 let inner = BytesAdapter::from(FlownodeFlowKeyInner::prefix(flownode_id).into_bytes());
71
72 FlowScoped::new(inner).to_bytes()
73 }
74
75 pub fn flow_id(&self) -> FlowId {
77 self.0.flow_id
78 }
79
80 #[cfg(test)]
81 pub fn flownode_id(&self) -> FlownodeId {
83 self.0.flownode_id
84 }
85
86 pub fn partition_id(&self) -> FlowPartitionId {
88 self.0.partition_id
89 }
90}
91
92pub struct FlownodeFlowKeyInner {
94 flownode_id: FlownodeId,
95 flow_id: FlowId,
96 partition_id: FlowPartitionId,
97}
98
99impl FlownodeFlowKeyInner {
100 pub fn new(flownode_id: FlownodeId, flow_id: FlowId, partition_id: FlowPartitionId) -> Self {
102 Self {
103 flownode_id,
104 flow_id,
105 partition_id,
106 }
107 }
108
109 pub fn prefix(flownode_id: FlownodeId) -> String {
110 format!("{}/{flownode_id}/", FLOWNODE_FLOW_KEY_PREFIX)
111 }
112}
113
114impl<'a> MetadataKey<'a, FlownodeFlowKeyInner> for FlownodeFlowKeyInner {
115 fn to_bytes(&self) -> Vec<u8> {
116 format!(
117 "{FLOWNODE_FLOW_KEY_PREFIX}/{}/{}/{}",
118 self.flownode_id, self.flow_id, self.partition_id,
119 )
120 .into_bytes()
121 }
122
123 fn from_bytes(bytes: &'a [u8]) -> Result<FlownodeFlowKeyInner> {
124 let key = std::str::from_utf8(bytes).map_err(|e| {
125 error::InvalidMetadataSnafu {
126 err_msg: format!(
127 "FlownodeFlowKeyInner '{}' is not a valid UTF8 string: {e}",
128 String::from_utf8_lossy(bytes)
129 ),
130 }
131 .build()
132 })?;
133 let captures =
134 FLOWNODE_FLOW_KEY_PATTERN
135 .captures(key)
136 .context(error::InvalidMetadataSnafu {
137 err_msg: format!("Invalid FlownodeFlowKeyInner '{key}'"),
138 })?;
139 let flownode_id = captures[1].parse::<FlownodeId>().unwrap();
141 let flow_id = captures[2].parse::<FlowId>().unwrap();
142 let partition_id = captures[3].parse::<FlowPartitionId>().unwrap();
143
144 Ok(FlownodeFlowKeyInner {
145 flownode_id,
146 flow_id,
147 partition_id,
148 })
149 }
150}
151
152pub struct FlownodeFlowManager {
154 kv_backend: KvBackendRef,
155}
156
157pub fn flownode_flow_key_decoder(kv: KeyValue) -> Result<FlownodeFlowKey> {
159 FlownodeFlowKey::from_bytes(&kv.key)
160}
161
162impl FlownodeFlowManager {
163 pub fn new(kv_backend: KvBackendRef) -> Self {
165 Self { kv_backend }
166 }
167
168 pub fn flows(
170 &self,
171 flownode_id: FlownodeId,
172 ) -> BoxStream<'static, Result<(FlowId, FlowPartitionId)>> {
173 let start_key = FlownodeFlowKey::range_start_key(flownode_id);
174 let req = RangeRequest::new().with_prefix(start_key);
175
176 let stream = PaginationStream::new(
177 self.kv_backend.clone(),
178 req,
179 DEFAULT_PAGE_SIZE,
180 flownode_flow_key_decoder,
181 )
182 .into_stream();
183
184 Box::pin(stream.map_ok(|key| (key.flow_id(), key.partition_id())))
185 }
186
187 pub(crate) fn build_create_txn<I: IntoIterator<Item = (FlowPartitionId, FlownodeId)>>(
191 &self,
192 flow_id: FlowId,
193 flownode_ids: I,
194 ) -> Txn {
195 let txns = flownode_ids
196 .into_iter()
197 .map(|(partition_id, flownode_id)| {
198 let key = FlownodeFlowKey::new(flownode_id, flow_id, partition_id).to_bytes();
199 TxnOp::Put(key, vec![])
200 })
201 .collect::<Vec<_>>();
202
203 Txn::new().and_then(txns)
204 }
205}
206
207#[cfg(test)]
208mod tests {
209 use crate::key::flow::flownode_flow::FlownodeFlowKey;
210 use crate::key::MetadataKey;
211
212 #[test]
213 fn test_key_serialization() {
214 let flownode_flow = FlownodeFlowKey::new(1, 2, 0);
215 assert_eq!(b"__flow/flownode/1/2/0".to_vec(), flownode_flow.to_bytes());
216 let prefix = FlownodeFlowKey::range_start_key(1);
217 assert_eq!(b"__flow/flownode/1/".to_vec(), prefix);
218 }
219
220 #[test]
221 fn test_key_deserialization() {
222 let bytes = b"__flow/flownode/1/2/0".to_vec();
223 let key = FlownodeFlowKey::from_bytes(&bytes).unwrap();
224 assert_eq!(key.flownode_id(), 1);
225 assert_eq!(key.flow_id(), 2);
226 assert_eq!(key.partition_id(), 0);
227 }
228}