common_meta/key/flow/
flow_route.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use futures::TryStreamExt;
16use lazy_static::lazy_static;
17use regex::Regex;
18use serde::{Deserialize, Serialize};
19use snafu::OptionExt;
20
21use crate::error::{self, Result};
22use crate::key::flow::flow_info::FlowInfoValue;
23use crate::key::flow::{flownode_addr_helper, FlowScoped};
24use crate::key::node_address::NodeAddressKey;
25use crate::key::{
26    BytesAdapter, DeserializedValueWithBytes, FlowId, FlowPartitionId, MetadataKey, MetadataValue,
27};
28use crate::kv_backend::txn::{Txn, TxnOp};
29use crate::kv_backend::KvBackendRef;
30use crate::peer::Peer;
31use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE};
32use crate::rpc::store::RangeRequest;
33use crate::rpc::KeyValue;
34
35const FLOW_ROUTE_KEY_PREFIX: &str = "route";
36
37lazy_static! {
38    static ref FLOW_ROUTE_KEY_PATTERN: Regex =
39        Regex::new(&format!("^{FLOW_ROUTE_KEY_PREFIX}/([0-9]+)/([0-9]+)$")).unwrap();
40}
41
42/// The key stores the route info of the flow.
43///
44/// The layout: `__flow/route/{flow_id}/{partition_id}`.
45#[derive(Debug, Clone, PartialEq)]
46pub struct FlowRouteKey(FlowScoped<FlowRouteKeyInner>);
47
48impl FlowRouteKey {
49    /// Returns a new [FlowRouteKey].
50    pub fn new(flow_id: FlowId, partition_id: FlowPartitionId) -> FlowRouteKey {
51        let inner = FlowRouteKeyInner::new(flow_id, partition_id);
52        FlowRouteKey(FlowScoped::new(inner))
53    }
54
55    /// The prefix used to retrieve all [FlowRouteKey]s with the specified `flow_id`.
56    pub fn range_start_key(flow_id: FlowId) -> Vec<u8> {
57        let inner = BytesAdapter::from(FlowRouteKeyInner::prefix(flow_id).into_bytes());
58
59        FlowScoped::new(inner).to_bytes()
60    }
61
62    /// Returns the [`FlowId`]
63    pub fn flow_id(&self) -> FlowId {
64        self.0.flow_id
65    }
66
67    /// Returns the [`FlowPartitionId`]
68    pub fn partition_id(&self) -> FlowPartitionId {
69        self.0.partition_id
70    }
71}
72
73impl<'a> MetadataKey<'a, FlowRouteKey> for FlowRouteKey {
74    fn to_bytes(&self) -> Vec<u8> {
75        self.0.to_bytes()
76    }
77
78    fn from_bytes(bytes: &'a [u8]) -> Result<FlowRouteKey> {
79        Ok(FlowRouteKey(FlowScoped::<FlowRouteKeyInner>::from_bytes(
80            bytes,
81        )?))
82    }
83}
84
85/// The key of flow route metadata.
86#[derive(Debug, Clone, Copy, PartialEq)]
87struct FlowRouteKeyInner {
88    flow_id: FlowId,
89    partition_id: FlowPartitionId,
90}
91
92impl FlowRouteKeyInner {
93    /// Returns a [FlowRouteKeyInner] with the specified `flow_id` and `partition_id`.
94    pub fn new(flow_id: FlowId, partition_id: FlowPartitionId) -> FlowRouteKeyInner {
95        FlowRouteKeyInner {
96            flow_id,
97            partition_id,
98        }
99    }
100
101    fn prefix(flow_id: FlowId) -> String {
102        format!("{}/{flow_id}/", FLOW_ROUTE_KEY_PREFIX)
103    }
104}
105
106impl<'a> MetadataKey<'a, FlowRouteKeyInner> for FlowRouteKeyInner {
107    fn to_bytes(&self) -> Vec<u8> {
108        format!(
109            "{FLOW_ROUTE_KEY_PREFIX}/{}/{}",
110            self.flow_id, self.partition_id
111        )
112        .into_bytes()
113    }
114
115    fn from_bytes(bytes: &'a [u8]) -> Result<FlowRouteKeyInner> {
116        let key = std::str::from_utf8(bytes).map_err(|e| {
117            error::InvalidMetadataSnafu {
118                err_msg: format!(
119                    "FlowInfoKeyInner '{}' is not a valid UTF8 string: {e}",
120                    String::from_utf8_lossy(bytes)
121                ),
122            }
123            .build()
124        })?;
125        let captures =
126            FLOW_ROUTE_KEY_PATTERN
127                .captures(key)
128                .context(error::InvalidMetadataSnafu {
129                    err_msg: format!("Invalid FlowInfoKeyInner '{key}'"),
130                })?;
131        // Safety: pass the regex check above
132        let flow_id = captures[1].parse::<FlowId>().unwrap();
133        let partition_id = captures[2].parse::<FlowId>().unwrap();
134
135        Ok(FlowRouteKeyInner {
136            flow_id,
137            partition_id,
138        })
139    }
140}
141
142/// The route info of flow.
143#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
144pub struct FlowRouteValue {
145    pub(crate) peer: Peer,
146}
147
148impl From<Peer> for FlowRouteValue {
149    fn from(peer: Peer) -> Self {
150        Self { peer }
151    }
152}
153
154impl FlowRouteValue {
155    /// Returns the `peer`.
156    pub fn peer(&self) -> &Peer {
157        &self.peer
158    }
159}
160
161/// Decodes `KeyValue` to ([`FlowRouteKey`],[`FlowRouteValue`]).
162pub fn flow_route_decoder(kv: KeyValue) -> Result<(FlowRouteKey, FlowRouteValue)> {
163    let key = FlowRouteKey::from_bytes(&kv.key)?;
164    let value = FlowRouteValue::try_from_raw_value(&kv.value)?;
165    Ok((key, value))
166}
167
168/// The manager of [FlowRouteKey].
169pub struct FlowRouteManager {
170    kv_backend: KvBackendRef,
171}
172
173impl FlowRouteManager {
174    /// Returns a new [FlowRouteManager].
175    pub fn new(kv_backend: KvBackendRef) -> Self {
176        Self { kv_backend }
177    }
178
179    /// Retrieves all [FlowRouteValue]s of the specified `flow_id`.
180    pub async fn routes(&self, flow_id: FlowId) -> Result<Vec<(FlowRouteKey, FlowRouteValue)>> {
181        let start_key = FlowRouteKey::range_start_key(flow_id);
182        let req = RangeRequest::new().with_prefix(start_key);
183        let stream = PaginationStream::new(
184            self.kv_backend.clone(),
185            req,
186            DEFAULT_PAGE_SIZE,
187            flow_route_decoder,
188        )
189        .into_stream();
190
191        let mut res = stream.try_collect::<Vec<_>>().await?;
192        self.remap_flow_route_addresses(&mut res).await?;
193        Ok(res)
194    }
195
196    /// Builds a create flow routes transaction.
197    ///
198    /// Puts `__flow/route/{flow_id}/{partition_id}` keys.
199    pub(crate) fn build_create_txn<I: IntoIterator<Item = (FlowPartitionId, FlowRouteValue)>>(
200        &self,
201        flow_id: FlowId,
202        flow_routes: I,
203    ) -> Result<Txn> {
204        let txns = flow_routes
205            .into_iter()
206            .map(|(partition_id, route)| {
207                let key = FlowRouteKey::new(flow_id, partition_id).to_bytes();
208
209                Ok(TxnOp::Put(key, route.try_as_raw_value()?))
210            })
211            .collect::<Result<Vec<_>>>()?;
212
213        Ok(Txn::new().and_then(txns))
214    }
215
216    /// Builds a update flow routes transaction.
217    ///
218    /// Puts `__flow/route/{flow_id}/{partition_id}` keys.
219    /// Also removes `__flow/route/{flow_id}/{old_partition_id}` keys.
220    pub(crate) fn build_update_txn<I: IntoIterator<Item = (FlowPartitionId, FlowRouteValue)>>(
221        &self,
222        flow_id: FlowId,
223        current_flow_info: &DeserializedValueWithBytes<FlowInfoValue>,
224        flow_routes: I,
225    ) -> Result<Txn> {
226        let del_txns = current_flow_info.flownode_ids().keys().map(|partition_id| {
227            let key = FlowRouteKey::new(flow_id, *partition_id).to_bytes();
228            Ok(TxnOp::Delete(key))
229        });
230
231        let put_txns = flow_routes.into_iter().map(|(partition_id, route)| {
232            let key = FlowRouteKey::new(flow_id, partition_id).to_bytes();
233
234            Ok(TxnOp::Put(key, route.try_as_raw_value()?))
235        });
236        let txns = del_txns.chain(put_txns).collect::<Result<Vec<_>>>()?;
237        Ok(Txn::new().and_then(txns))
238    }
239
240    async fn remap_flow_route_addresses(
241        &self,
242        flow_routes: &mut [(FlowRouteKey, FlowRouteValue)],
243    ) -> Result<()> {
244        let keys = flow_routes
245            .iter()
246            .map(|(_, value)| NodeAddressKey::with_flownode(value.peer.id))
247            .collect();
248        let flow_node_addrs =
249            flownode_addr_helper::get_flownode_addresses(&self.kv_backend, keys).await?;
250        for (_, flow_route_value) in flow_routes.iter_mut() {
251            let flownode_id = flow_route_value.peer.id;
252            // If an id lacks a corresponding address in the `flow_node_addrs`,
253            // it means the old address in `table_flow_value` is still valid,
254            // which is expected.
255            if let Some(node_addr) = flow_node_addrs.get(&flownode_id) {
256                flow_route_value.peer.addr = node_addr.peer.addr.clone();
257            }
258        }
259        Ok(())
260    }
261}
262
263#[cfg(test)]
264mod tests {
265    use super::FlowRouteKey;
266    use crate::key::MetadataKey;
267
268    #[test]
269    fn test_key_serialization() {
270        let flow_route_key = FlowRouteKey::new(1, 2);
271        assert_eq!(b"__flow/route/1/2".to_vec(), flow_route_key.to_bytes());
272    }
273
274    #[test]
275    fn test_key_deserialization() {
276        let bytes = b"__flow/route/1/2".to_vec();
277        let key = FlowRouteKey::from_bytes(&bytes).unwrap();
278        assert_eq!(key.flow_id(), 1);
279        assert_eq!(key.partition_id(), 2);
280    }
281
282    #[test]
283    fn test_key_start_range() {
284        assert_eq!(
285            b"__flow/route/2/".to_vec(),
286            FlowRouteKey::range_start_key(2)
287        );
288    }
289}