1use std::sync::Arc;
16
17use futures::TryStreamExt;
18use lazy_static::lazy_static;
19use regex::Regex;
20use serde::{Deserialize, Serialize};
21use snafu::OptionExt;
22use table::metadata::TableId;
23
24use crate::error::{self, Result};
25use crate::key::flow::flow_info::FlowInfoValue;
26use crate::key::flow::{flownode_addr_helper, FlowScoped};
27use crate::key::node_address::NodeAddressKey;
28use crate::key::{
29 BytesAdapter, DeserializedValueWithBytes, FlowId, FlowPartitionId, MetadataKey, MetadataValue,
30};
31use crate::kv_backend::txn::{Txn, TxnOp};
32use crate::kv_backend::KvBackendRef;
33use crate::peer::Peer;
34use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE};
35use crate::rpc::store::RangeRequest;
36use crate::rpc::KeyValue;
37use crate::FlownodeId;
38
39const TABLE_FLOW_KEY_PREFIX: &str = "source_table";
40
41lazy_static! {
42 static ref TABLE_FLOW_KEY_PATTERN: Regex = Regex::new(&format!(
43 "^{TABLE_FLOW_KEY_PREFIX}/([0-9]+)/([0-9]+)/([0-9]+)/([0-9]+)$"
44 ))
45 .unwrap();
46}
47
48#[derive(Debug, Clone, Copy, PartialEq, Eq)]
50struct TableFlowKeyInner {
51 table_id: TableId,
52 flownode_id: FlownodeId,
53 flow_id: FlowId,
54 partition_id: FlowPartitionId,
55}
56
57#[derive(Debug, PartialEq)]
61pub struct TableFlowKey(FlowScoped<TableFlowKeyInner>);
62
63impl<'a> MetadataKey<'a, TableFlowKey> for TableFlowKey {
64 fn to_bytes(&self) -> Vec<u8> {
65 self.0.to_bytes()
66 }
67
68 fn from_bytes(bytes: &'a [u8]) -> Result<TableFlowKey> {
69 Ok(TableFlowKey(FlowScoped::<TableFlowKeyInner>::from_bytes(
70 bytes,
71 )?))
72 }
73}
74
75impl TableFlowKey {
76 pub fn new(
78 table_id: TableId,
79 flownode_id: FlownodeId,
80 flow_id: FlowId,
81 partition_id: FlowPartitionId,
82 ) -> TableFlowKey {
83 let inner = TableFlowKeyInner::new(table_id, flownode_id, flow_id, partition_id);
84 TableFlowKey(FlowScoped::new(inner))
85 }
86
87 pub fn range_start_key(table_id: TableId) -> Vec<u8> {
89 let inner = BytesAdapter::from(TableFlowKeyInner::prefix(table_id).into_bytes());
90
91 FlowScoped::new(inner).to_bytes()
92 }
93
94 pub fn source_table_id(&self) -> TableId {
96 self.0.table_id
97 }
98
99 pub fn flow_id(&self) -> FlowId {
101 self.0.flow_id
102 }
103
104 pub fn flownode_id(&self) -> FlownodeId {
106 self.0.flownode_id
107 }
108
109 pub fn partition_id(&self) -> FlowPartitionId {
111 self.0.partition_id
112 }
113}
114
115impl TableFlowKeyInner {
116 fn new(
118 table_id: TableId,
119 flownode_id: FlownodeId,
120 flow_id: FlowId,
121 partition_id: FlowPartitionId,
122 ) -> TableFlowKeyInner {
123 Self {
124 table_id,
125 flownode_id,
126 flow_id,
127 partition_id,
128 }
129 }
130
131 fn prefix(table_id: TableId) -> String {
132 format!("{}/{table_id}/", TABLE_FLOW_KEY_PREFIX)
133 }
134}
135
136impl<'a> MetadataKey<'a, TableFlowKeyInner> for TableFlowKeyInner {
137 fn to_bytes(&self) -> Vec<u8> {
138 format!(
139 "{TABLE_FLOW_KEY_PREFIX}/{}/{}/{}/{}",
140 self.table_id, self.flownode_id, self.flow_id, self.partition_id
141 )
142 .into_bytes()
143 }
144
145 fn from_bytes(bytes: &'a [u8]) -> Result<TableFlowKeyInner> {
146 let key = std::str::from_utf8(bytes).map_err(|e| {
147 error::InvalidMetadataSnafu {
148 err_msg: format!(
149 "TableFlowKeyInner '{}' is not a valid UTF8 string: {e}",
150 String::from_utf8_lossy(bytes)
151 ),
152 }
153 .build()
154 })?;
155 let captures =
156 TABLE_FLOW_KEY_PATTERN
157 .captures(key)
158 .context(error::InvalidMetadataSnafu {
159 err_msg: format!("Invalid TableFlowKeyInner '{key}'"),
160 })?;
161 let table_id = captures[1].parse::<TableId>().unwrap();
163 let flownode_id = captures[2].parse::<FlownodeId>().unwrap();
164 let flow_id = captures[3].parse::<FlowId>().unwrap();
165 let partition_id = captures[4].parse::<FlowPartitionId>().unwrap();
166 Ok(TableFlowKeyInner::new(
167 table_id,
168 flownode_id,
169 flow_id,
170 partition_id,
171 ))
172 }
173}
174
175#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
176pub struct TableFlowValue {
177 pub(crate) peer: Peer,
178}
179
180pub fn table_flow_decoder(kv: KeyValue) -> Result<(TableFlowKey, TableFlowValue)> {
182 let key = TableFlowKey::from_bytes(&kv.key)?;
183 let value = TableFlowValue::try_from_raw_value(&kv.value)?;
184 Ok((key, value))
185}
186
187pub type TableFlowManagerRef = Arc<TableFlowManager>;
188
189pub struct TableFlowManager {
191 kv_backend: KvBackendRef,
192}
193
194impl TableFlowManager {
195 pub fn new(kv_backend: KvBackendRef) -> Self {
197 Self { kv_backend }
198 }
199
200 pub async fn flows(&self, table_id: TableId) -> Result<Vec<(TableFlowKey, TableFlowValue)>> {
204 let start_key = TableFlowKey::range_start_key(table_id);
205 let req = RangeRequest::new().with_prefix(start_key);
206 let stream = PaginationStream::new(
207 self.kv_backend.clone(),
208 req,
209 DEFAULT_PAGE_SIZE,
210 table_flow_decoder,
211 )
212 .into_stream();
213
214 let mut res = stream.try_collect::<Vec<_>>().await?;
215 self.remap_table_flow_addresses(&mut res).await?;
216 Ok(res)
217 }
218
219 pub fn build_create_txn(
223 &self,
224 flow_id: FlowId,
225 table_flow_values: Vec<(FlowPartitionId, TableFlowValue)>,
226 source_table_ids: &[TableId],
227 ) -> Result<Txn> {
228 let mut txns = Vec::with_capacity(source_table_ids.len() * table_flow_values.len());
229
230 for (partition_id, table_flow_value) in table_flow_values {
231 let flownode_id = table_flow_value.peer.id;
232 let value = table_flow_value.try_as_raw_value()?;
233 for source_table_id in source_table_ids {
234 txns.push(TxnOp::Put(
235 TableFlowKey::new(*source_table_id, flownode_id, flow_id, partition_id)
236 .to_bytes(),
237 value.clone(),
238 ));
239 }
240 }
241
242 Ok(Txn::new().and_then(txns))
243 }
244
245 pub fn build_update_txn(
251 &self,
252 flow_id: FlowId,
253 current_flow_info: &DeserializedValueWithBytes<FlowInfoValue>,
254 table_flow_values: Vec<(FlowPartitionId, TableFlowValue)>,
255 source_table_ids: &[TableId],
256 ) -> Result<Txn> {
257 let mut txns = Vec::with_capacity(2 * source_table_ids.len() * table_flow_values.len());
258
259 for (part_id, node_id) in current_flow_info.flownode_ids() {
261 for source_table_id in current_flow_info.source_table_ids() {
262 txns.push(TxnOp::Delete(
263 TableFlowKey::new(*source_table_id, *node_id, flow_id, *part_id).to_bytes(),
264 ));
265 }
266 }
267
268 for (partition_id, table_flow_value) in table_flow_values {
269 let flownode_id = table_flow_value.peer.id;
270 let value = table_flow_value.try_as_raw_value()?;
271 for source_table_id in source_table_ids {
272 txns.push(TxnOp::Put(
273 TableFlowKey::new(*source_table_id, flownode_id, flow_id, partition_id)
274 .to_bytes(),
275 value.clone(),
276 ));
277 }
278 }
279
280 Ok(Txn::new().and_then(txns))
281 }
282
283 async fn remap_table_flow_addresses(
284 &self,
285 table_flows: &mut [(TableFlowKey, TableFlowValue)],
286 ) -> Result<()> {
287 let keys = table_flows
288 .iter()
289 .map(|(_, value)| NodeAddressKey::with_flownode(value.peer.id))
290 .collect::<Vec<_>>();
291 let flownode_addrs =
292 flownode_addr_helper::get_flownode_addresses(&self.kv_backend, keys).await?;
293 for (_, table_flow_value) in table_flows.iter_mut() {
294 let flownode_id = table_flow_value.peer.id;
295 if let Some(flownode_addr) = flownode_addrs.get(&flownode_id) {
299 table_flow_value.peer.addr = flownode_addr.peer.addr.clone();
300 }
301 }
302 Ok(())
303 }
304}
305
306#[cfg(test)]
307mod tests {
308 use super::*;
309
310 #[test]
311 fn test_key_serialization() {
312 let table_flow_key = TableFlowKey::new(1024, 1, 2, 0);
313 assert_eq!(
314 b"__flow/source_table/1024/1/2/0".to_vec(),
315 table_flow_key.to_bytes(),
316 );
317 let prefix = TableFlowKey::range_start_key(1024);
318 assert_eq!(b"__flow/source_table/1024/".to_vec(), prefix);
319 }
320
321 #[test]
322 fn test_key_deserialization() {
323 let bytes = b"__flow/source_table/1024/1/2/0".to_vec();
324 let key = TableFlowKey::from_bytes(&bytes).unwrap();
325 assert_eq!(key.source_table_id(), 1024);
326 assert_eq!(key.flownode_id(), 1);
327 assert_eq!(key.flow_id(), 2);
328 assert_eq!(key.partition_id(), 0);
329 }
330}