1use std::sync::Arc;
16
17use futures::TryStreamExt;
18use lazy_static::lazy_static;
19use regex::Regex;
20use serde::{Deserialize, Serialize};
21use snafu::OptionExt;
22use table::metadata::TableId;
23
24use crate::error::{self, Result};
25use crate::key::flow::{flownode_addr_helper, FlowScoped};
26use crate::key::node_address::NodeAddressKey;
27use crate::key::{BytesAdapter, FlowId, FlowPartitionId, MetadataKey, MetadataValue};
28use crate::kv_backend::txn::{Txn, TxnOp};
29use crate::kv_backend::KvBackendRef;
30use crate::peer::Peer;
31use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE};
32use crate::rpc::store::RangeRequest;
33use crate::rpc::KeyValue;
34use crate::FlownodeId;
35
36const TABLE_FLOW_KEY_PREFIX: &str = "source_table";
37
38lazy_static! {
39 static ref TABLE_FLOW_KEY_PATTERN: Regex = Regex::new(&format!(
40 "^{TABLE_FLOW_KEY_PREFIX}/([0-9]+)/([0-9]+)/([0-9]+)/([0-9]+)$"
41 ))
42 .unwrap();
43}
44
45#[derive(Debug, Clone, Copy, PartialEq, Eq)]
47struct TableFlowKeyInner {
48 table_id: TableId,
49 flownode_id: FlownodeId,
50 flow_id: FlowId,
51 partition_id: FlowPartitionId,
52}
53
54#[derive(Debug, PartialEq)]
58pub struct TableFlowKey(FlowScoped<TableFlowKeyInner>);
59
60impl<'a> MetadataKey<'a, TableFlowKey> for TableFlowKey {
61 fn to_bytes(&self) -> Vec<u8> {
62 self.0.to_bytes()
63 }
64
65 fn from_bytes(bytes: &'a [u8]) -> Result<TableFlowKey> {
66 Ok(TableFlowKey(FlowScoped::<TableFlowKeyInner>::from_bytes(
67 bytes,
68 )?))
69 }
70}
71
72impl TableFlowKey {
73 pub fn new(
75 table_id: TableId,
76 flownode_id: FlownodeId,
77 flow_id: FlowId,
78 partition_id: FlowPartitionId,
79 ) -> TableFlowKey {
80 let inner = TableFlowKeyInner::new(table_id, flownode_id, flow_id, partition_id);
81 TableFlowKey(FlowScoped::new(inner))
82 }
83
84 pub fn range_start_key(table_id: TableId) -> Vec<u8> {
86 let inner = BytesAdapter::from(TableFlowKeyInner::prefix(table_id).into_bytes());
87
88 FlowScoped::new(inner).to_bytes()
89 }
90
91 pub fn source_table_id(&self) -> TableId {
93 self.0.table_id
94 }
95
96 pub fn flow_id(&self) -> FlowId {
98 self.0.flow_id
99 }
100
101 pub fn flownode_id(&self) -> FlownodeId {
103 self.0.flownode_id
104 }
105
106 pub fn partition_id(&self) -> FlowPartitionId {
108 self.0.partition_id
109 }
110}
111
112impl TableFlowKeyInner {
113 fn new(
115 table_id: TableId,
116 flownode_id: FlownodeId,
117 flow_id: FlowId,
118 partition_id: FlowPartitionId,
119 ) -> TableFlowKeyInner {
120 Self {
121 table_id,
122 flownode_id,
123 flow_id,
124 partition_id,
125 }
126 }
127
128 fn prefix(table_id: TableId) -> String {
129 format!("{}/{table_id}/", TABLE_FLOW_KEY_PREFIX)
130 }
131}
132
133impl<'a> MetadataKey<'a, TableFlowKeyInner> for TableFlowKeyInner {
134 fn to_bytes(&self) -> Vec<u8> {
135 format!(
136 "{TABLE_FLOW_KEY_PREFIX}/{}/{}/{}/{}",
137 self.table_id, self.flownode_id, self.flow_id, self.partition_id
138 )
139 .into_bytes()
140 }
141
142 fn from_bytes(bytes: &'a [u8]) -> Result<TableFlowKeyInner> {
143 let key = std::str::from_utf8(bytes).map_err(|e| {
144 error::InvalidMetadataSnafu {
145 err_msg: format!(
146 "TableFlowKeyInner '{}' is not a valid UTF8 string: {e}",
147 String::from_utf8_lossy(bytes)
148 ),
149 }
150 .build()
151 })?;
152 let captures =
153 TABLE_FLOW_KEY_PATTERN
154 .captures(key)
155 .context(error::InvalidMetadataSnafu {
156 err_msg: format!("Invalid TableFlowKeyInner '{key}'"),
157 })?;
158 let table_id = captures[1].parse::<TableId>().unwrap();
160 let flownode_id = captures[2].parse::<FlownodeId>().unwrap();
161 let flow_id = captures[3].parse::<FlowId>().unwrap();
162 let partition_id = captures[4].parse::<FlowPartitionId>().unwrap();
163 Ok(TableFlowKeyInner::new(
164 table_id,
165 flownode_id,
166 flow_id,
167 partition_id,
168 ))
169 }
170}
171
172#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
173pub struct TableFlowValue {
174 pub(crate) peer: Peer,
175}
176
177pub fn table_flow_decoder(kv: KeyValue) -> Result<(TableFlowKey, TableFlowValue)> {
179 let key = TableFlowKey::from_bytes(&kv.key)?;
180 let value = TableFlowValue::try_from_raw_value(&kv.value)?;
181 Ok((key, value))
182}
183
184pub type TableFlowManagerRef = Arc<TableFlowManager>;
185
186pub struct TableFlowManager {
188 kv_backend: KvBackendRef,
189}
190
191impl TableFlowManager {
192 pub fn new(kv_backend: KvBackendRef) -> Self {
194 Self { kv_backend }
195 }
196
197 pub async fn flows(&self, table_id: TableId) -> Result<Vec<(TableFlowKey, TableFlowValue)>> {
201 let start_key = TableFlowKey::range_start_key(table_id);
202 let req = RangeRequest::new().with_prefix(start_key);
203 let stream = PaginationStream::new(
204 self.kv_backend.clone(),
205 req,
206 DEFAULT_PAGE_SIZE,
207 table_flow_decoder,
208 )
209 .into_stream();
210
211 let mut res = stream.try_collect::<Vec<_>>().await?;
212 self.remap_table_flow_addresses(&mut res).await?;
213 Ok(res)
214 }
215
216 pub fn build_create_txn(
220 &self,
221 flow_id: FlowId,
222 table_flow_values: Vec<(FlowPartitionId, TableFlowValue)>,
223 source_table_ids: &[TableId],
224 ) -> Result<Txn> {
225 let mut txns = Vec::with_capacity(source_table_ids.len() * table_flow_values.len());
226
227 for (partition_id, table_flow_value) in table_flow_values {
228 let flownode_id = table_flow_value.peer.id;
229 let value = table_flow_value.try_as_raw_value()?;
230 for source_table_id in source_table_ids {
231 txns.push(TxnOp::Put(
232 TableFlowKey::new(*source_table_id, flownode_id, flow_id, partition_id)
233 .to_bytes(),
234 value.clone(),
235 ));
236 }
237 }
238
239 Ok(Txn::new().and_then(txns))
240 }
241
242 async fn remap_table_flow_addresses(
243 &self,
244 table_flows: &mut [(TableFlowKey, TableFlowValue)],
245 ) -> Result<()> {
246 let keys = table_flows
247 .iter()
248 .map(|(_, value)| NodeAddressKey::with_flownode(value.peer.id))
249 .collect::<Vec<_>>();
250 let flownode_addrs =
251 flownode_addr_helper::get_flownode_addresses(&self.kv_backend, keys).await?;
252 for (_, table_flow_value) in table_flows.iter_mut() {
253 let flownode_id = table_flow_value.peer.id;
254 if let Some(flownode_addr) = flownode_addrs.get(&flownode_id) {
258 table_flow_value.peer.addr = flownode_addr.peer.addr.clone();
259 }
260 }
261 Ok(())
262 }
263}
264
265#[cfg(test)]
266mod tests {
267 use super::*;
268
269 #[test]
270 fn test_key_serialization() {
271 let table_flow_key = TableFlowKey::new(1024, 1, 2, 0);
272 assert_eq!(
273 b"__flow/source_table/1024/1/2/0".to_vec(),
274 table_flow_key.to_bytes(),
275 );
276 let prefix = TableFlowKey::range_start_key(1024);
277 assert_eq!(b"__flow/source_table/1024/".to_vec(), prefix);
278 }
279
280 #[test]
281 fn test_key_deserialization() {
282 let bytes = b"__flow/source_table/1024/1/2/0".to_vec();
283 let key = TableFlowKey::from_bytes(&bytes).unwrap();
284 assert_eq!(key.source_table_id(), 1024);
285 assert_eq!(key.flownode_id(), 1);
286 assert_eq!(key.flow_id(), 2);
287 assert_eq!(key.partition_id(), 0);
288 }
289}