common_meta/key/flow/
flow_route.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use futures::TryStreamExt;
16use lazy_static::lazy_static;
17use regex::Regex;
18use serde::{Deserialize, Serialize};
19use snafu::OptionExt;
20
21use crate::error::{self, Result};
22use crate::key::flow::{flownode_addr_helper, FlowScoped};
23use crate::key::node_address::NodeAddressKey;
24use crate::key::{BytesAdapter, FlowId, FlowPartitionId, MetadataKey, MetadataValue};
25use crate::kv_backend::txn::{Txn, TxnOp};
26use crate::kv_backend::KvBackendRef;
27use crate::peer::Peer;
28use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE};
29use crate::rpc::store::RangeRequest;
30use crate::rpc::KeyValue;
31
32const FLOW_ROUTE_KEY_PREFIX: &str = "route";
33
34lazy_static! {
35    static ref FLOW_ROUTE_KEY_PATTERN: Regex =
36        Regex::new(&format!("^{FLOW_ROUTE_KEY_PREFIX}/([0-9]+)/([0-9]+)$")).unwrap();
37}
38
39/// The key stores the route info of the flow.
40///
41/// The layout: `__flow/route/{flow_id}/{partition_id}`.
42#[derive(Debug, PartialEq)]
43pub struct FlowRouteKey(FlowScoped<FlowRouteKeyInner>);
44
45impl FlowRouteKey {
46    /// Returns a new [FlowRouteKey].
47    pub fn new(flow_id: FlowId, partition_id: FlowPartitionId) -> FlowRouteKey {
48        let inner = FlowRouteKeyInner::new(flow_id, partition_id);
49        FlowRouteKey(FlowScoped::new(inner))
50    }
51
52    /// The prefix used to retrieve all [FlowRouteKey]s with the specified `flow_id`.
53    pub fn range_start_key(flow_id: FlowId) -> Vec<u8> {
54        let inner = BytesAdapter::from(FlowRouteKeyInner::prefix(flow_id).into_bytes());
55
56        FlowScoped::new(inner).to_bytes()
57    }
58
59    /// Returns the [`FlowId`]
60    pub fn flow_id(&self) -> FlowId {
61        self.0.flow_id
62    }
63
64    /// Returns the [`FlowPartitionId`]
65    pub fn partition_id(&self) -> FlowPartitionId {
66        self.0.partition_id
67    }
68}
69
70impl<'a> MetadataKey<'a, FlowRouteKey> for FlowRouteKey {
71    fn to_bytes(&self) -> Vec<u8> {
72        self.0.to_bytes()
73    }
74
75    fn from_bytes(bytes: &'a [u8]) -> Result<FlowRouteKey> {
76        Ok(FlowRouteKey(FlowScoped::<FlowRouteKeyInner>::from_bytes(
77            bytes,
78        )?))
79    }
80}
81
82/// The key of flow route metadata.
83#[derive(Debug, Clone, Copy, PartialEq)]
84struct FlowRouteKeyInner {
85    flow_id: FlowId,
86    partition_id: FlowPartitionId,
87}
88
89impl FlowRouteKeyInner {
90    /// Returns a [FlowRouteKeyInner] with the specified `flow_id` and `partition_id`.
91    pub fn new(flow_id: FlowId, partition_id: FlowPartitionId) -> FlowRouteKeyInner {
92        FlowRouteKeyInner {
93            flow_id,
94            partition_id,
95        }
96    }
97
98    fn prefix(flow_id: FlowId) -> String {
99        format!("{}/{flow_id}/", FLOW_ROUTE_KEY_PREFIX)
100    }
101}
102
103impl<'a> MetadataKey<'a, FlowRouteKeyInner> for FlowRouteKeyInner {
104    fn to_bytes(&self) -> Vec<u8> {
105        format!(
106            "{FLOW_ROUTE_KEY_PREFIX}/{}/{}",
107            self.flow_id, self.partition_id
108        )
109        .into_bytes()
110    }
111
112    fn from_bytes(bytes: &'a [u8]) -> Result<FlowRouteKeyInner> {
113        let key = std::str::from_utf8(bytes).map_err(|e| {
114            error::InvalidMetadataSnafu {
115                err_msg: format!(
116                    "FlowInfoKeyInner '{}' is not a valid UTF8 string: {e}",
117                    String::from_utf8_lossy(bytes)
118                ),
119            }
120            .build()
121        })?;
122        let captures =
123            FLOW_ROUTE_KEY_PATTERN
124                .captures(key)
125                .context(error::InvalidMetadataSnafu {
126                    err_msg: format!("Invalid FlowInfoKeyInner '{key}'"),
127                })?;
128        // Safety: pass the regex check above
129        let flow_id = captures[1].parse::<FlowId>().unwrap();
130        let partition_id = captures[2].parse::<FlowId>().unwrap();
131
132        Ok(FlowRouteKeyInner {
133            flow_id,
134            partition_id,
135        })
136    }
137}
138
139/// The route info of flow.
140#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
141pub struct FlowRouteValue {
142    pub(crate) peer: Peer,
143}
144
145impl FlowRouteValue {
146    /// Returns the `peer`.
147    pub fn peer(&self) -> &Peer {
148        &self.peer
149    }
150}
151
152/// Decodes `KeyValue` to ([`FlowRouteKey`],[`FlowRouteValue`]).
153pub fn flow_route_decoder(kv: KeyValue) -> Result<(FlowRouteKey, FlowRouteValue)> {
154    let key = FlowRouteKey::from_bytes(&kv.key)?;
155    let value = FlowRouteValue::try_from_raw_value(&kv.value)?;
156    Ok((key, value))
157}
158
159/// The manager of [FlowRouteKey].
160pub struct FlowRouteManager {
161    kv_backend: KvBackendRef,
162}
163
164impl FlowRouteManager {
165    /// Returns a new [FlowRouteManager].
166    pub fn new(kv_backend: KvBackendRef) -> Self {
167        Self { kv_backend }
168    }
169
170    /// Retrieves all [FlowRouteValue]s of the specified `flow_id`.
171    pub async fn routes(&self, flow_id: FlowId) -> Result<Vec<(FlowRouteKey, FlowRouteValue)>> {
172        let start_key = FlowRouteKey::range_start_key(flow_id);
173        let req = RangeRequest::new().with_prefix(start_key);
174        let stream = PaginationStream::new(
175            self.kv_backend.clone(),
176            req,
177            DEFAULT_PAGE_SIZE,
178            flow_route_decoder,
179        )
180        .into_stream();
181
182        let mut res = stream.try_collect::<Vec<_>>().await?;
183        self.remap_flow_route_addresses(&mut res).await?;
184        Ok(res)
185    }
186
187    /// Builds a create flow routes transaction.
188    ///
189    /// Puts `__flow/route/{flow_id}/{partition_id}` keys.
190    pub(crate) fn build_create_txn<I: IntoIterator<Item = (FlowPartitionId, FlowRouteValue)>>(
191        &self,
192        flow_id: FlowId,
193        flow_routes: I,
194    ) -> Result<Txn> {
195        let txns = flow_routes
196            .into_iter()
197            .map(|(partition_id, route)| {
198                let key = FlowRouteKey::new(flow_id, partition_id).to_bytes();
199
200                Ok(TxnOp::Put(key, route.try_as_raw_value()?))
201            })
202            .collect::<Result<Vec<_>>>()?;
203
204        Ok(Txn::new().and_then(txns))
205    }
206
207    async fn remap_flow_route_addresses(
208        &self,
209        flow_routes: &mut [(FlowRouteKey, FlowRouteValue)],
210    ) -> Result<()> {
211        let keys = flow_routes
212            .iter()
213            .map(|(_, value)| NodeAddressKey::with_flownode(value.peer.id))
214            .collect();
215        let flow_node_addrs =
216            flownode_addr_helper::get_flownode_addresses(&self.kv_backend, keys).await?;
217        for (_, flow_route_value) in flow_routes.iter_mut() {
218            let flownode_id = flow_route_value.peer.id;
219            // If an id lacks a corresponding address in the `flow_node_addrs`,
220            // it means the old address in `table_flow_value` is still valid,
221            // which is expected.
222            if let Some(node_addr) = flow_node_addrs.get(&flownode_id) {
223                flow_route_value.peer.addr = node_addr.peer.addr.clone();
224            }
225        }
226        Ok(())
227    }
228}
229
230#[cfg(test)]
231mod tests {
232    use super::FlowRouteKey;
233    use crate::key::MetadataKey;
234
235    #[test]
236    fn test_key_serialization() {
237        let flow_route_key = FlowRouteKey::new(1, 2);
238        assert_eq!(b"__flow/route/1/2".to_vec(), flow_route_key.to_bytes());
239    }
240
241    #[test]
242    fn test_key_deserialization() {
243        let bytes = b"__flow/route/1/2".to_vec();
244        let key = FlowRouteKey::from_bytes(&bytes).unwrap();
245        assert_eq!(key.flow_id(), 1);
246        assert_eq!(key.partition_id(), 2);
247    }
248
249    #[test]
250    fn test_key_start_range() {
251        assert_eq!(
252            b"__flow/route/2/".to_vec(),
253            FlowRouteKey::range_start_key(2)
254        );
255    }
256}