common_meta/key/flow/
flow_route.rs1use futures::TryStreamExt;
16use lazy_static::lazy_static;
17use regex::Regex;
18use serde::{Deserialize, Serialize};
19use snafu::OptionExt;
20
21use crate::error::{self, Result};
22use crate::key::flow::{flownode_addr_helper, FlowScoped};
23use crate::key::node_address::NodeAddressKey;
24use crate::key::{BytesAdapter, FlowId, FlowPartitionId, MetadataKey, MetadataValue};
25use crate::kv_backend::txn::{Txn, TxnOp};
26use crate::kv_backend::KvBackendRef;
27use crate::peer::Peer;
28use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE};
29use crate::rpc::store::RangeRequest;
30use crate::rpc::KeyValue;
31
32const FLOW_ROUTE_KEY_PREFIX: &str = "route";
33
34lazy_static! {
35 static ref FLOW_ROUTE_KEY_PATTERN: Regex =
36 Regex::new(&format!("^{FLOW_ROUTE_KEY_PREFIX}/([0-9]+)/([0-9]+)$")).unwrap();
37}
38
39#[derive(Debug, PartialEq)]
43pub struct FlowRouteKey(FlowScoped<FlowRouteKeyInner>);
44
45impl FlowRouteKey {
46 pub fn new(flow_id: FlowId, partition_id: FlowPartitionId) -> FlowRouteKey {
48 let inner = FlowRouteKeyInner::new(flow_id, partition_id);
49 FlowRouteKey(FlowScoped::new(inner))
50 }
51
52 pub fn range_start_key(flow_id: FlowId) -> Vec<u8> {
54 let inner = BytesAdapter::from(FlowRouteKeyInner::prefix(flow_id).into_bytes());
55
56 FlowScoped::new(inner).to_bytes()
57 }
58
59 pub fn flow_id(&self) -> FlowId {
61 self.0.flow_id
62 }
63
64 pub fn partition_id(&self) -> FlowPartitionId {
66 self.0.partition_id
67 }
68}
69
70impl<'a> MetadataKey<'a, FlowRouteKey> for FlowRouteKey {
71 fn to_bytes(&self) -> Vec<u8> {
72 self.0.to_bytes()
73 }
74
75 fn from_bytes(bytes: &'a [u8]) -> Result<FlowRouteKey> {
76 Ok(FlowRouteKey(FlowScoped::<FlowRouteKeyInner>::from_bytes(
77 bytes,
78 )?))
79 }
80}
81
82#[derive(Debug, Clone, Copy, PartialEq)]
84struct FlowRouteKeyInner {
85 flow_id: FlowId,
86 partition_id: FlowPartitionId,
87}
88
89impl FlowRouteKeyInner {
90 pub fn new(flow_id: FlowId, partition_id: FlowPartitionId) -> FlowRouteKeyInner {
92 FlowRouteKeyInner {
93 flow_id,
94 partition_id,
95 }
96 }
97
98 fn prefix(flow_id: FlowId) -> String {
99 format!("{}/{flow_id}/", FLOW_ROUTE_KEY_PREFIX)
100 }
101}
102
103impl<'a> MetadataKey<'a, FlowRouteKeyInner> for FlowRouteKeyInner {
104 fn to_bytes(&self) -> Vec<u8> {
105 format!(
106 "{FLOW_ROUTE_KEY_PREFIX}/{}/{}",
107 self.flow_id, self.partition_id
108 )
109 .into_bytes()
110 }
111
112 fn from_bytes(bytes: &'a [u8]) -> Result<FlowRouteKeyInner> {
113 let key = std::str::from_utf8(bytes).map_err(|e| {
114 error::InvalidMetadataSnafu {
115 err_msg: format!(
116 "FlowInfoKeyInner '{}' is not a valid UTF8 string: {e}",
117 String::from_utf8_lossy(bytes)
118 ),
119 }
120 .build()
121 })?;
122 let captures =
123 FLOW_ROUTE_KEY_PATTERN
124 .captures(key)
125 .context(error::InvalidMetadataSnafu {
126 err_msg: format!("Invalid FlowInfoKeyInner '{key}'"),
127 })?;
128 let flow_id = captures[1].parse::<FlowId>().unwrap();
130 let partition_id = captures[2].parse::<FlowId>().unwrap();
131
132 Ok(FlowRouteKeyInner {
133 flow_id,
134 partition_id,
135 })
136 }
137}
138
139#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
141pub struct FlowRouteValue {
142 pub(crate) peer: Peer,
143}
144
145impl FlowRouteValue {
146 pub fn peer(&self) -> &Peer {
148 &self.peer
149 }
150}
151
152pub fn flow_route_decoder(kv: KeyValue) -> Result<(FlowRouteKey, FlowRouteValue)> {
154 let key = FlowRouteKey::from_bytes(&kv.key)?;
155 let value = FlowRouteValue::try_from_raw_value(&kv.value)?;
156 Ok((key, value))
157}
158
159pub struct FlowRouteManager {
161 kv_backend: KvBackendRef,
162}
163
164impl FlowRouteManager {
165 pub fn new(kv_backend: KvBackendRef) -> Self {
167 Self { kv_backend }
168 }
169
170 pub async fn routes(&self, flow_id: FlowId) -> Result<Vec<(FlowRouteKey, FlowRouteValue)>> {
172 let start_key = FlowRouteKey::range_start_key(flow_id);
173 let req = RangeRequest::new().with_prefix(start_key);
174 let stream = PaginationStream::new(
175 self.kv_backend.clone(),
176 req,
177 DEFAULT_PAGE_SIZE,
178 flow_route_decoder,
179 )
180 .into_stream();
181
182 let mut res = stream.try_collect::<Vec<_>>().await?;
183 self.remap_flow_route_addresses(&mut res).await?;
184 Ok(res)
185 }
186
187 pub(crate) fn build_create_txn<I: IntoIterator<Item = (FlowPartitionId, FlowRouteValue)>>(
191 &self,
192 flow_id: FlowId,
193 flow_routes: I,
194 ) -> Result<Txn> {
195 let txns = flow_routes
196 .into_iter()
197 .map(|(partition_id, route)| {
198 let key = FlowRouteKey::new(flow_id, partition_id).to_bytes();
199
200 Ok(TxnOp::Put(key, route.try_as_raw_value()?))
201 })
202 .collect::<Result<Vec<_>>>()?;
203
204 Ok(Txn::new().and_then(txns))
205 }
206
207 async fn remap_flow_route_addresses(
208 &self,
209 flow_routes: &mut [(FlowRouteKey, FlowRouteValue)],
210 ) -> Result<()> {
211 let keys = flow_routes
212 .iter()
213 .map(|(_, value)| NodeAddressKey::with_flownode(value.peer.id))
214 .collect();
215 let flow_node_addrs =
216 flownode_addr_helper::get_flownode_addresses(&self.kv_backend, keys).await?;
217 for (_, flow_route_value) in flow_routes.iter_mut() {
218 let flownode_id = flow_route_value.peer.id;
219 if let Some(node_addr) = flow_node_addrs.get(&flownode_id) {
223 flow_route_value.peer.addr = node_addr.peer.addr.clone();
224 }
225 }
226 Ok(())
227 }
228}
229
230#[cfg(test)]
231mod tests {
232 use super::FlowRouteKey;
233 use crate::key::MetadataKey;
234
235 #[test]
236 fn test_key_serialization() {
237 let flow_route_key = FlowRouteKey::new(1, 2);
238 assert_eq!(b"__flow/route/1/2".to_vec(), flow_route_key.to_bytes());
239 }
240
241 #[test]
242 fn test_key_deserialization() {
243 let bytes = b"__flow/route/1/2".to_vec();
244 let key = FlowRouteKey::from_bytes(&bytes).unwrap();
245 assert_eq!(key.flow_id(), 1);
246 assert_eq!(key.partition_id(), 2);
247 }
248
249 #[test]
250 fn test_key_start_range() {
251 assert_eq!(
252 b"__flow/route/2/".to_vec(),
253 FlowRouteKey::range_start_key(2)
254 );
255 }
256}