common_meta/key/flow/
flownode_flow.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use futures::stream::BoxStream;
16use futures::TryStreamExt;
17use lazy_static::lazy_static;
18use regex::Regex;
19use snafu::OptionExt;
20
21use crate::error::{self, Result};
22use crate::key::flow::flow_info::FlowInfoValue;
23use crate::key::flow::FlowScoped;
24use crate::key::{BytesAdapter, DeserializedValueWithBytes, FlowId, FlowPartitionId, MetadataKey};
25use crate::kv_backend::txn::{Txn, TxnOp};
26use crate::kv_backend::KvBackendRef;
27use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE};
28use crate::rpc::store::RangeRequest;
29use crate::rpc::KeyValue;
30use crate::FlownodeId;
31
32lazy_static! {
33    static ref FLOWNODE_FLOW_KEY_PATTERN: Regex = Regex::new(&format!(
34        "^{FLOWNODE_FLOW_KEY_PREFIX}/([0-9]+)/([0-9]+)/([0-9]+)$"
35    ))
36    .unwrap();
37}
38
39const FLOWNODE_FLOW_KEY_PREFIX: &str = "flownode";
40
41/// The key of mapping [FlownodeId] to [FlowId].
42///
43/// The layout `__flow/flownode/{flownode_id}/{flow_id}/{partition_id}`
44pub struct FlownodeFlowKey(FlowScoped<FlownodeFlowKeyInner>);
45
46impl<'a> MetadataKey<'a, FlownodeFlowKey> for FlownodeFlowKey {
47    fn to_bytes(&self) -> Vec<u8> {
48        self.0.to_bytes()
49    }
50
51    fn from_bytes(bytes: &'a [u8]) -> Result<FlownodeFlowKey> {
52        Ok(FlownodeFlowKey(
53            FlowScoped::<FlownodeFlowKeyInner>::from_bytes(bytes)?,
54        ))
55    }
56}
57
58impl FlownodeFlowKey {
59    /// Returns a new [FlownodeFlowKey].
60    pub fn new(
61        flownode_id: FlownodeId,
62        flow_id: FlowId,
63        partition_id: FlowPartitionId,
64    ) -> FlownodeFlowKey {
65        let inner = FlownodeFlowKeyInner::new(flownode_id, flow_id, partition_id);
66        FlownodeFlowKey(FlowScoped::new(inner))
67    }
68
69    /// The prefix used to retrieve all [FlownodeFlowKey]s with the specified `flownode_id`.
70    pub fn range_start_key(flownode_id: FlownodeId) -> Vec<u8> {
71        let inner = BytesAdapter::from(FlownodeFlowKeyInner::prefix(flownode_id).into_bytes());
72
73        FlowScoped::new(inner).to_bytes()
74    }
75
76    /// Returns the [FlowId].
77    pub fn flow_id(&self) -> FlowId {
78        self.0.flow_id
79    }
80
81    #[cfg(test)]
82    /// Returns the [FlownodeId].
83    pub fn flownode_id(&self) -> FlownodeId {
84        self.0.flownode_id
85    }
86
87    /// Returns the [PartitionId].
88    pub fn partition_id(&self) -> FlowPartitionId {
89        self.0.partition_id
90    }
91}
92
93/// The key of mapping [FlownodeId] to [FlowId].
94pub struct FlownodeFlowKeyInner {
95    flownode_id: FlownodeId,
96    flow_id: FlowId,
97    partition_id: FlowPartitionId,
98}
99
100impl FlownodeFlowKeyInner {
101    /// Returns a [FlownodeFlowKey] with the specified `flownode_id`, `flow_id` and `partition_id`.
102    pub fn new(flownode_id: FlownodeId, flow_id: FlowId, partition_id: FlowPartitionId) -> Self {
103        Self {
104            flownode_id,
105            flow_id,
106            partition_id,
107        }
108    }
109
110    pub fn prefix(flownode_id: FlownodeId) -> String {
111        format!("{}/{flownode_id}/", FLOWNODE_FLOW_KEY_PREFIX)
112    }
113}
114
115impl<'a> MetadataKey<'a, FlownodeFlowKeyInner> for FlownodeFlowKeyInner {
116    fn to_bytes(&self) -> Vec<u8> {
117        format!(
118            "{FLOWNODE_FLOW_KEY_PREFIX}/{}/{}/{}",
119            self.flownode_id, self.flow_id, self.partition_id,
120        )
121        .into_bytes()
122    }
123
124    fn from_bytes(bytes: &'a [u8]) -> Result<FlownodeFlowKeyInner> {
125        let key = std::str::from_utf8(bytes).map_err(|e| {
126            error::InvalidMetadataSnafu {
127                err_msg: format!(
128                    "FlownodeFlowKeyInner '{}' is not a valid UTF8 string: {e}",
129                    String::from_utf8_lossy(bytes)
130                ),
131            }
132            .build()
133        })?;
134        let captures =
135            FLOWNODE_FLOW_KEY_PATTERN
136                .captures(key)
137                .context(error::InvalidMetadataSnafu {
138                    err_msg: format!("Invalid FlownodeFlowKeyInner '{key}'"),
139                })?;
140        // Safety: pass the regex check above
141        let flownode_id = captures[1].parse::<FlownodeId>().unwrap();
142        let flow_id = captures[2].parse::<FlowId>().unwrap();
143        let partition_id = captures[3].parse::<FlowPartitionId>().unwrap();
144
145        Ok(FlownodeFlowKeyInner {
146            flownode_id,
147            flow_id,
148            partition_id,
149        })
150    }
151}
152
153/// The manager of [FlownodeFlowKey].
154pub struct FlownodeFlowManager {
155    kv_backend: KvBackendRef,
156}
157
158/// Decodes `KeyValue` to [FlownodeFlowKey].
159pub fn flownode_flow_key_decoder(kv: KeyValue) -> Result<FlownodeFlowKey> {
160    FlownodeFlowKey::from_bytes(&kv.key)
161}
162
163impl FlownodeFlowManager {
164    /// Returns a new [FlownodeFlowManager].
165    pub fn new(kv_backend: KvBackendRef) -> Self {
166        Self { kv_backend }
167    }
168
169    /// Whether given flow exist on this flownode.
170    pub async fn exists(
171        &self,
172        flownode_id: FlownodeId,
173        flow_id: FlowId,
174        partition_id: FlowPartitionId,
175    ) -> Result<bool> {
176        let key = FlownodeFlowKey::new(flownode_id, flow_id, partition_id).to_bytes();
177        Ok(self.kv_backend.get(&key).await?.is_some())
178    }
179
180    /// Retrieves all [FlowId] and [FlowPartitionId]s of the specified `flownode_id`.
181    pub fn flows(
182        &self,
183        flownode_id: FlownodeId,
184    ) -> BoxStream<'static, Result<(FlowId, FlowPartitionId)>> {
185        let start_key = FlownodeFlowKey::range_start_key(flownode_id);
186        let req = RangeRequest::new().with_prefix(start_key);
187
188        let stream = PaginationStream::new(
189            self.kv_backend.clone(),
190            req,
191            DEFAULT_PAGE_SIZE,
192            flownode_flow_key_decoder,
193        )
194        .into_stream();
195
196        Box::pin(stream.map_ok(|key| (key.flow_id(), key.partition_id())))
197    }
198
199    /// Builds a create flownode flow transaction.
200    ///
201    /// Puts `__flownode_flow/{flownode_id}/{flow_id}/{partition_id}` keys.
202    pub(crate) fn build_create_txn<I: IntoIterator<Item = (FlowPartitionId, FlownodeId)>>(
203        &self,
204        flow_id: FlowId,
205        flownode_ids: I,
206    ) -> Txn {
207        let txns = flownode_ids
208            .into_iter()
209            .map(|(partition_id, flownode_id)| {
210                let key = FlownodeFlowKey::new(flownode_id, flow_id, partition_id).to_bytes();
211                TxnOp::Put(key, vec![])
212            })
213            .collect::<Vec<_>>();
214
215        Txn::new().and_then(txns)
216    }
217
218    /// Builds a update flownode flow transaction.
219    ///
220    /// Puts `__flownode_flow/{flownode_id}/{flow_id}/{partition_id}` keys.
221    /// Remove the old `__flownode_flow/{old_flownode_id}/{flow_id}/{old_partition_id}` keys.
222    pub(crate) fn build_update_txn<I: IntoIterator<Item = (FlowPartitionId, FlownodeId)>>(
223        &self,
224        flow_id: FlowId,
225        current_flow_info: &DeserializedValueWithBytes<FlowInfoValue>,
226        flownode_ids: I,
227    ) -> Txn {
228        let del_txns =
229            current_flow_info
230                .flownode_ids()
231                .iter()
232                .map(|(partition_id, flownode_id)| {
233                    let key = FlownodeFlowKey::new(*flownode_id, flow_id, *partition_id).to_bytes();
234                    TxnOp::Delete(key)
235                });
236        let put_txns = flownode_ids.into_iter().map(|(partition_id, flownode_id)| {
237            let key = FlownodeFlowKey::new(flownode_id, flow_id, partition_id).to_bytes();
238            TxnOp::Put(key, vec![])
239        });
240        let txns = del_txns.chain(put_txns).collect::<Vec<_>>();
241
242        Txn::new().and_then(txns)
243    }
244}
245
246#[cfg(test)]
247mod tests {
248    use crate::key::flow::flownode_flow::FlownodeFlowKey;
249    use crate::key::MetadataKey;
250
251    #[test]
252    fn test_key_serialization() {
253        let flownode_flow = FlownodeFlowKey::new(1, 2, 0);
254        assert_eq!(b"__flow/flownode/1/2/0".to_vec(), flownode_flow.to_bytes());
255        let prefix = FlownodeFlowKey::range_start_key(1);
256        assert_eq!(b"__flow/flownode/1/".to_vec(), prefix);
257    }
258
259    #[test]
260    fn test_key_deserialization() {
261        let bytes = b"__flow/flownode/1/2/0".to_vec();
262        let key = FlownodeFlowKey::from_bytes(&bytes).unwrap();
263        assert_eq!(key.flownode_id(), 1);
264        assert_eq!(key.flow_id(), 2);
265        assert_eq!(key.partition_id(), 0);
266    }
267}