common_meta/key/flow/
table_flow.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use futures::TryStreamExt;
18use lazy_static::lazy_static;
19use regex::Regex;
20use serde::{Deserialize, Serialize};
21use snafu::OptionExt;
22use table::metadata::TableId;
23
24use crate::error::{self, Result};
25use crate::key::flow::flow_info::FlowInfoValue;
26use crate::key::flow::{flownode_addr_helper, FlowScoped};
27use crate::key::node_address::NodeAddressKey;
28use crate::key::{
29    BytesAdapter, DeserializedValueWithBytes, FlowId, FlowPartitionId, MetadataKey, MetadataValue,
30};
31use crate::kv_backend::txn::{Txn, TxnOp};
32use crate::kv_backend::KvBackendRef;
33use crate::peer::Peer;
34use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE};
35use crate::rpc::store::RangeRequest;
36use crate::rpc::KeyValue;
37use crate::FlownodeId;
38
39const TABLE_FLOW_KEY_PREFIX: &str = "source_table";
40
41lazy_static! {
42    static ref TABLE_FLOW_KEY_PATTERN: Regex = Regex::new(&format!(
43        "^{TABLE_FLOW_KEY_PREFIX}/([0-9]+)/([0-9]+)/([0-9]+)/([0-9]+)$"
44    ))
45    .unwrap();
46}
47
48/// The key of mapping [TableId] to [FlownodeId] and [FlowId].
49#[derive(Debug, Clone, Copy, PartialEq, Eq)]
50struct TableFlowKeyInner {
51    table_id: TableId,
52    flownode_id: FlownodeId,
53    flow_id: FlowId,
54    partition_id: FlowPartitionId,
55}
56
57/// The key of mapping [TableId] to [FlownodeId] and [FlowId].
58///
59/// The layout: `__flow/source_table/{table_id}/{flownode_id}/{flow_id}/{partition_id}`.
60#[derive(Debug, PartialEq)]
61pub struct TableFlowKey(FlowScoped<TableFlowKeyInner>);
62
63impl<'a> MetadataKey<'a, TableFlowKey> for TableFlowKey {
64    fn to_bytes(&self) -> Vec<u8> {
65        self.0.to_bytes()
66    }
67
68    fn from_bytes(bytes: &'a [u8]) -> Result<TableFlowKey> {
69        Ok(TableFlowKey(FlowScoped::<TableFlowKeyInner>::from_bytes(
70            bytes,
71        )?))
72    }
73}
74
75impl TableFlowKey {
76    /// Returns a new [TableFlowKey].
77    pub fn new(
78        table_id: TableId,
79        flownode_id: FlownodeId,
80        flow_id: FlowId,
81        partition_id: FlowPartitionId,
82    ) -> TableFlowKey {
83        let inner = TableFlowKeyInner::new(table_id, flownode_id, flow_id, partition_id);
84        TableFlowKey(FlowScoped::new(inner))
85    }
86
87    /// The prefix used to retrieve all [TableFlowKey]s with the specified `table_id`.
88    pub fn range_start_key(table_id: TableId) -> Vec<u8> {
89        let inner = BytesAdapter::from(TableFlowKeyInner::prefix(table_id).into_bytes());
90
91        FlowScoped::new(inner).to_bytes()
92    }
93
94    /// Returns the source [TableId].
95    pub fn source_table_id(&self) -> TableId {
96        self.0.table_id
97    }
98
99    /// Returns the [FlowId].
100    pub fn flow_id(&self) -> FlowId {
101        self.0.flow_id
102    }
103
104    /// Returns the [FlownodeId].
105    pub fn flownode_id(&self) -> FlownodeId {
106        self.0.flownode_id
107    }
108
109    /// Returns the [PartitionId].
110    pub fn partition_id(&self) -> FlowPartitionId {
111        self.0.partition_id
112    }
113}
114
115impl TableFlowKeyInner {
116    /// Returns a new [TableFlowKey].
117    fn new(
118        table_id: TableId,
119        flownode_id: FlownodeId,
120        flow_id: FlowId,
121        partition_id: FlowPartitionId,
122    ) -> TableFlowKeyInner {
123        Self {
124            table_id,
125            flownode_id,
126            flow_id,
127            partition_id,
128        }
129    }
130
131    fn prefix(table_id: TableId) -> String {
132        format!("{}/{table_id}/", TABLE_FLOW_KEY_PREFIX)
133    }
134}
135
136impl<'a> MetadataKey<'a, TableFlowKeyInner> for TableFlowKeyInner {
137    fn to_bytes(&self) -> Vec<u8> {
138        format!(
139            "{TABLE_FLOW_KEY_PREFIX}/{}/{}/{}/{}",
140            self.table_id, self.flownode_id, self.flow_id, self.partition_id
141        )
142        .into_bytes()
143    }
144
145    fn from_bytes(bytes: &'a [u8]) -> Result<TableFlowKeyInner> {
146        let key = std::str::from_utf8(bytes).map_err(|e| {
147            error::InvalidMetadataSnafu {
148                err_msg: format!(
149                    "TableFlowKeyInner '{}' is not a valid UTF8 string: {e}",
150                    String::from_utf8_lossy(bytes)
151                ),
152            }
153            .build()
154        })?;
155        let captures =
156            TABLE_FLOW_KEY_PATTERN
157                .captures(key)
158                .context(error::InvalidMetadataSnafu {
159                    err_msg: format!("Invalid TableFlowKeyInner '{key}'"),
160                })?;
161        // Safety: pass the regex check above
162        let table_id = captures[1].parse::<TableId>().unwrap();
163        let flownode_id = captures[2].parse::<FlownodeId>().unwrap();
164        let flow_id = captures[3].parse::<FlowId>().unwrap();
165        let partition_id = captures[4].parse::<FlowPartitionId>().unwrap();
166        Ok(TableFlowKeyInner::new(
167            table_id,
168            flownode_id,
169            flow_id,
170            partition_id,
171        ))
172    }
173}
174
175#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
176pub struct TableFlowValue {
177    pub(crate) peer: Peer,
178}
179
180/// Decodes `KeyValue` to [TableFlowKey].
181pub fn table_flow_decoder(kv: KeyValue) -> Result<(TableFlowKey, TableFlowValue)> {
182    let key = TableFlowKey::from_bytes(&kv.key)?;
183    let value = TableFlowValue::try_from_raw_value(&kv.value)?;
184    Ok((key, value))
185}
186
187pub type TableFlowManagerRef = Arc<TableFlowManager>;
188
189/// The manager of [TableFlowKey].
190pub struct TableFlowManager {
191    kv_backend: KvBackendRef,
192}
193
194impl TableFlowManager {
195    /// Returns a new [TableFlowManager].
196    pub fn new(kv_backend: KvBackendRef) -> Self {
197        Self { kv_backend }
198    }
199
200    /// Retrieves all [TableFlowKey]s of the specified `table_id`.
201    ///
202    /// TODO(discord9): add cache for it since range request does not support cache.
203    pub async fn flows(&self, table_id: TableId) -> Result<Vec<(TableFlowKey, TableFlowValue)>> {
204        let start_key = TableFlowKey::range_start_key(table_id);
205        let req = RangeRequest::new().with_prefix(start_key);
206        let stream = PaginationStream::new(
207            self.kv_backend.clone(),
208            req,
209            DEFAULT_PAGE_SIZE,
210            table_flow_decoder,
211        )
212        .into_stream();
213
214        let mut res = stream.try_collect::<Vec<_>>().await?;
215        self.remap_table_flow_addresses(&mut res).await?;
216        Ok(res)
217    }
218
219    /// Builds a create table flow transaction.
220    ///
221    /// Puts `__flow/source_table/{table_id}/{node_id}/{flow_id}/{partition_id}` keys.
222    pub fn build_create_txn(
223        &self,
224        flow_id: FlowId,
225        table_flow_values: Vec<(FlowPartitionId, TableFlowValue)>,
226        source_table_ids: &[TableId],
227    ) -> Result<Txn> {
228        let mut txns = Vec::with_capacity(source_table_ids.len() * table_flow_values.len());
229
230        for (partition_id, table_flow_value) in table_flow_values {
231            let flownode_id = table_flow_value.peer.id;
232            let value = table_flow_value.try_as_raw_value()?;
233            for source_table_id in source_table_ids {
234                txns.push(TxnOp::Put(
235                    TableFlowKey::new(*source_table_id, flownode_id, flow_id, partition_id)
236                        .to_bytes(),
237                    value.clone(),
238                ));
239            }
240        }
241
242        Ok(Txn::new().and_then(txns))
243    }
244
245    /// Builds a update table flow transaction.
246    ///
247    /// Puts `__flow/source_table/{table_id}/{node_id}/{flow_id}/{partition_id}` keys,
248    /// Also remove previous
249    /// `__flow/source_table/{table_id}/{old_node_id}/{flow_id}/{partition_id}` keys.
250    pub fn build_update_txn(
251        &self,
252        flow_id: FlowId,
253        current_flow_info: &DeserializedValueWithBytes<FlowInfoValue>,
254        table_flow_values: Vec<(FlowPartitionId, TableFlowValue)>,
255        source_table_ids: &[TableId],
256    ) -> Result<Txn> {
257        let mut txns = Vec::with_capacity(2 * source_table_ids.len() * table_flow_values.len());
258
259        // first remove the old keys
260        for (part_id, node_id) in current_flow_info.flownode_ids() {
261            for source_table_id in current_flow_info.source_table_ids() {
262                txns.push(TxnOp::Delete(
263                    TableFlowKey::new(*source_table_id, *node_id, flow_id, *part_id).to_bytes(),
264                ));
265            }
266        }
267
268        for (partition_id, table_flow_value) in table_flow_values {
269            let flownode_id = table_flow_value.peer.id;
270            let value = table_flow_value.try_as_raw_value()?;
271            for source_table_id in source_table_ids {
272                txns.push(TxnOp::Put(
273                    TableFlowKey::new(*source_table_id, flownode_id, flow_id, partition_id)
274                        .to_bytes(),
275                    value.clone(),
276                ));
277            }
278        }
279
280        Ok(Txn::new().and_then(txns))
281    }
282
283    async fn remap_table_flow_addresses(
284        &self,
285        table_flows: &mut [(TableFlowKey, TableFlowValue)],
286    ) -> Result<()> {
287        let keys = table_flows
288            .iter()
289            .map(|(_, value)| NodeAddressKey::with_flownode(value.peer.id))
290            .collect::<Vec<_>>();
291        let flownode_addrs =
292            flownode_addr_helper::get_flownode_addresses(&self.kv_backend, keys).await?;
293        for (_, table_flow_value) in table_flows.iter_mut() {
294            let flownode_id = table_flow_value.peer.id;
295            // If an id lacks a corresponding address in the `flow_node_addrs`,
296            // it means the old address in `table_flow_value` is still valid,
297            // which is expected.
298            if let Some(flownode_addr) = flownode_addrs.get(&flownode_id) {
299                table_flow_value.peer.addr = flownode_addr.peer.addr.clone();
300            }
301        }
302        Ok(())
303    }
304}
305
306#[cfg(test)]
307mod tests {
308    use super::*;
309
310    #[test]
311    fn test_key_serialization() {
312        let table_flow_key = TableFlowKey::new(1024, 1, 2, 0);
313        assert_eq!(
314            b"__flow/source_table/1024/1/2/0".to_vec(),
315            table_flow_key.to_bytes(),
316        );
317        let prefix = TableFlowKey::range_start_key(1024);
318        assert_eq!(b"__flow/source_table/1024/".to_vec(), prefix);
319    }
320
321    #[test]
322    fn test_key_deserialization() {
323        let bytes = b"__flow/source_table/1024/1/2/0".to_vec();
324        let key = TableFlowKey::from_bytes(&bytes).unwrap();
325        assert_eq!(key.source_table_id(), 1024);
326        assert_eq!(key.flownode_id(), 1);
327        assert_eq!(key.flow_id(), 2);
328        assert_eq!(key.partition_id(), 0);
329    }
330}