common_meta/key/flow/
flow_route.rs1use futures::TryStreamExt;
16use lazy_static::lazy_static;
17use regex::Regex;
18use serde::{Deserialize, Serialize};
19use snafu::OptionExt;
20
21use crate::error::{self, Result};
22use crate::key::flow::flow_info::FlowInfoValue;
23use crate::key::flow::{flownode_addr_helper, FlowScoped};
24use crate::key::node_address::NodeAddressKey;
25use crate::key::{
26 BytesAdapter, DeserializedValueWithBytes, FlowId, FlowPartitionId, MetadataKey, MetadataValue,
27};
28use crate::kv_backend::txn::{Txn, TxnOp};
29use crate::kv_backend::KvBackendRef;
30use crate::peer::Peer;
31use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE};
32use crate::rpc::store::RangeRequest;
33use crate::rpc::KeyValue;
34
35const FLOW_ROUTE_KEY_PREFIX: &str = "route";
36
37lazy_static! {
38 static ref FLOW_ROUTE_KEY_PATTERN: Regex =
39 Regex::new(&format!("^{FLOW_ROUTE_KEY_PREFIX}/([0-9]+)/([0-9]+)$")).unwrap();
40}
41
42#[derive(Debug, Clone, PartialEq)]
46pub struct FlowRouteKey(FlowScoped<FlowRouteKeyInner>);
47
48impl FlowRouteKey {
49 pub fn new(flow_id: FlowId, partition_id: FlowPartitionId) -> FlowRouteKey {
51 let inner = FlowRouteKeyInner::new(flow_id, partition_id);
52 FlowRouteKey(FlowScoped::new(inner))
53 }
54
55 pub fn range_start_key(flow_id: FlowId) -> Vec<u8> {
57 let inner = BytesAdapter::from(FlowRouteKeyInner::prefix(flow_id).into_bytes());
58
59 FlowScoped::new(inner).to_bytes()
60 }
61
62 pub fn flow_id(&self) -> FlowId {
64 self.0.flow_id
65 }
66
67 pub fn partition_id(&self) -> FlowPartitionId {
69 self.0.partition_id
70 }
71}
72
73impl<'a> MetadataKey<'a, FlowRouteKey> for FlowRouteKey {
74 fn to_bytes(&self) -> Vec<u8> {
75 self.0.to_bytes()
76 }
77
78 fn from_bytes(bytes: &'a [u8]) -> Result<FlowRouteKey> {
79 Ok(FlowRouteKey(FlowScoped::<FlowRouteKeyInner>::from_bytes(
80 bytes,
81 )?))
82 }
83}
84
85#[derive(Debug, Clone, Copy, PartialEq)]
87struct FlowRouteKeyInner {
88 flow_id: FlowId,
89 partition_id: FlowPartitionId,
90}
91
92impl FlowRouteKeyInner {
93 pub fn new(flow_id: FlowId, partition_id: FlowPartitionId) -> FlowRouteKeyInner {
95 FlowRouteKeyInner {
96 flow_id,
97 partition_id,
98 }
99 }
100
101 fn prefix(flow_id: FlowId) -> String {
102 format!("{}/{flow_id}/", FLOW_ROUTE_KEY_PREFIX)
103 }
104}
105
106impl<'a> MetadataKey<'a, FlowRouteKeyInner> for FlowRouteKeyInner {
107 fn to_bytes(&self) -> Vec<u8> {
108 format!(
109 "{FLOW_ROUTE_KEY_PREFIX}/{}/{}",
110 self.flow_id, self.partition_id
111 )
112 .into_bytes()
113 }
114
115 fn from_bytes(bytes: &'a [u8]) -> Result<FlowRouteKeyInner> {
116 let key = std::str::from_utf8(bytes).map_err(|e| {
117 error::InvalidMetadataSnafu {
118 err_msg: format!(
119 "FlowInfoKeyInner '{}' is not a valid UTF8 string: {e}",
120 String::from_utf8_lossy(bytes)
121 ),
122 }
123 .build()
124 })?;
125 let captures =
126 FLOW_ROUTE_KEY_PATTERN
127 .captures(key)
128 .context(error::InvalidMetadataSnafu {
129 err_msg: format!("Invalid FlowInfoKeyInner '{key}'"),
130 })?;
131 let flow_id = captures[1].parse::<FlowId>().unwrap();
133 let partition_id = captures[2].parse::<FlowId>().unwrap();
134
135 Ok(FlowRouteKeyInner {
136 flow_id,
137 partition_id,
138 })
139 }
140}
141
142#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
144pub struct FlowRouteValue {
145 pub(crate) peer: Peer,
146}
147
148impl From<Peer> for FlowRouteValue {
149 fn from(peer: Peer) -> Self {
150 Self { peer }
151 }
152}
153
154impl FlowRouteValue {
155 pub fn peer(&self) -> &Peer {
157 &self.peer
158 }
159}
160
161pub fn flow_route_decoder(kv: KeyValue) -> Result<(FlowRouteKey, FlowRouteValue)> {
163 let key = FlowRouteKey::from_bytes(&kv.key)?;
164 let value = FlowRouteValue::try_from_raw_value(&kv.value)?;
165 Ok((key, value))
166}
167
168pub struct FlowRouteManager {
170 kv_backend: KvBackendRef,
171}
172
173impl FlowRouteManager {
174 pub fn new(kv_backend: KvBackendRef) -> Self {
176 Self { kv_backend }
177 }
178
179 pub async fn routes(&self, flow_id: FlowId) -> Result<Vec<(FlowRouteKey, FlowRouteValue)>> {
181 let start_key = FlowRouteKey::range_start_key(flow_id);
182 let req = RangeRequest::new().with_prefix(start_key);
183 let stream = PaginationStream::new(
184 self.kv_backend.clone(),
185 req,
186 DEFAULT_PAGE_SIZE,
187 flow_route_decoder,
188 )
189 .into_stream();
190
191 let mut res = stream.try_collect::<Vec<_>>().await?;
192 self.remap_flow_route_addresses(&mut res).await?;
193 Ok(res)
194 }
195
196 pub(crate) fn build_create_txn<I: IntoIterator<Item = (FlowPartitionId, FlowRouteValue)>>(
200 &self,
201 flow_id: FlowId,
202 flow_routes: I,
203 ) -> Result<Txn> {
204 let txns = flow_routes
205 .into_iter()
206 .map(|(partition_id, route)| {
207 let key = FlowRouteKey::new(flow_id, partition_id).to_bytes();
208
209 Ok(TxnOp::Put(key, route.try_as_raw_value()?))
210 })
211 .collect::<Result<Vec<_>>>()?;
212
213 Ok(Txn::new().and_then(txns))
214 }
215
216 pub(crate) fn build_update_txn<I: IntoIterator<Item = (FlowPartitionId, FlowRouteValue)>>(
221 &self,
222 flow_id: FlowId,
223 current_flow_info: &DeserializedValueWithBytes<FlowInfoValue>,
224 flow_routes: I,
225 ) -> Result<Txn> {
226 let del_txns = current_flow_info.flownode_ids().keys().map(|partition_id| {
227 let key = FlowRouteKey::new(flow_id, *partition_id).to_bytes();
228 Ok(TxnOp::Delete(key))
229 });
230
231 let put_txns = flow_routes.into_iter().map(|(partition_id, route)| {
232 let key = FlowRouteKey::new(flow_id, partition_id).to_bytes();
233
234 Ok(TxnOp::Put(key, route.try_as_raw_value()?))
235 });
236 let txns = del_txns.chain(put_txns).collect::<Result<Vec<_>>>()?;
237 Ok(Txn::new().and_then(txns))
238 }
239
240 async fn remap_flow_route_addresses(
241 &self,
242 flow_routes: &mut [(FlowRouteKey, FlowRouteValue)],
243 ) -> Result<()> {
244 let keys = flow_routes
245 .iter()
246 .map(|(_, value)| NodeAddressKey::with_flownode(value.peer.id))
247 .collect();
248 let flow_node_addrs =
249 flownode_addr_helper::get_flownode_addresses(&self.kv_backend, keys).await?;
250 for (_, flow_route_value) in flow_routes.iter_mut() {
251 let flownode_id = flow_route_value.peer.id;
252 if let Some(node_addr) = flow_node_addrs.get(&flownode_id) {
256 flow_route_value.peer.addr = node_addr.peer.addr.clone();
257 }
258 }
259 Ok(())
260 }
261}
262
263#[cfg(test)]
264mod tests {
265 use super::FlowRouteKey;
266 use crate::key::MetadataKey;
267
268 #[test]
269 fn test_key_serialization() {
270 let flow_route_key = FlowRouteKey::new(1, 2);
271 assert_eq!(b"__flow/route/1/2".to_vec(), flow_route_key.to_bytes());
272 }
273
274 #[test]
275 fn test_key_deserialization() {
276 let bytes = b"__flow/route/1/2".to_vec();
277 let key = FlowRouteKey::from_bytes(&bytes).unwrap();
278 assert_eq!(key.flow_id(), 1);
279 assert_eq!(key.partition_id(), 2);
280 }
281
282 #[test]
283 fn test_key_start_range() {
284 assert_eq!(
285 b"__flow/route/2/".to_vec(),
286 FlowRouteKey::range_start_key(2)
287 );
288 }
289}