common_meta/key/flow/
flownode_flow.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use futures::stream::BoxStream;
16use futures::TryStreamExt;
17use lazy_static::lazy_static;
18use regex::Regex;
19use snafu::OptionExt;
20
21use crate::error::{self, Result};
22use crate::key::flow::FlowScoped;
23use crate::key::{BytesAdapter, FlowId, FlowPartitionId, MetadataKey};
24use crate::kv_backend::txn::{Txn, TxnOp};
25use crate::kv_backend::KvBackendRef;
26use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE};
27use crate::rpc::store::RangeRequest;
28use crate::rpc::KeyValue;
29use crate::FlownodeId;
30
31lazy_static! {
32    static ref FLOWNODE_FLOW_KEY_PATTERN: Regex = Regex::new(&format!(
33        "^{FLOWNODE_FLOW_KEY_PREFIX}/([0-9]+)/([0-9]+)/([0-9]+)$"
34    ))
35    .unwrap();
36}
37
38const FLOWNODE_FLOW_KEY_PREFIX: &str = "flownode";
39
40/// The key of mapping [FlownodeId] to [FlowId].
41///
42/// The layout `__flow/flownode/{flownode_id}/{flow_id}/{partition_id}`
43pub struct FlownodeFlowKey(FlowScoped<FlownodeFlowKeyInner>);
44
45impl<'a> MetadataKey<'a, FlownodeFlowKey> for FlownodeFlowKey {
46    fn to_bytes(&self) -> Vec<u8> {
47        self.0.to_bytes()
48    }
49
50    fn from_bytes(bytes: &'a [u8]) -> Result<FlownodeFlowKey> {
51        Ok(FlownodeFlowKey(
52            FlowScoped::<FlownodeFlowKeyInner>::from_bytes(bytes)?,
53        ))
54    }
55}
56
57impl FlownodeFlowKey {
58    /// Returns a new [FlownodeFlowKey].
59    pub fn new(
60        flownode_id: FlownodeId,
61        flow_id: FlowId,
62        partition_id: FlowPartitionId,
63    ) -> FlownodeFlowKey {
64        let inner = FlownodeFlowKeyInner::new(flownode_id, flow_id, partition_id);
65        FlownodeFlowKey(FlowScoped::new(inner))
66    }
67
68    /// The prefix used to retrieve all [FlownodeFlowKey]s with the specified `flownode_id`.
69    pub fn range_start_key(flownode_id: FlownodeId) -> Vec<u8> {
70        let inner = BytesAdapter::from(FlownodeFlowKeyInner::prefix(flownode_id).into_bytes());
71
72        FlowScoped::new(inner).to_bytes()
73    }
74
75    /// Returns the [FlowId].
76    pub fn flow_id(&self) -> FlowId {
77        self.0.flow_id
78    }
79
80    #[cfg(test)]
81    /// Returns the [FlownodeId].
82    pub fn flownode_id(&self) -> FlownodeId {
83        self.0.flownode_id
84    }
85
86    /// Returns the [PartitionId].
87    pub fn partition_id(&self) -> FlowPartitionId {
88        self.0.partition_id
89    }
90}
91
92/// The key of mapping [FlownodeId] to [FlowId].
93pub struct FlownodeFlowKeyInner {
94    flownode_id: FlownodeId,
95    flow_id: FlowId,
96    partition_id: FlowPartitionId,
97}
98
99impl FlownodeFlowKeyInner {
100    /// Returns a [FlownodeFlowKey] with the specified `flownode_id`, `flow_id` and `partition_id`.
101    pub fn new(flownode_id: FlownodeId, flow_id: FlowId, partition_id: FlowPartitionId) -> Self {
102        Self {
103            flownode_id,
104            flow_id,
105            partition_id,
106        }
107    }
108
109    pub fn prefix(flownode_id: FlownodeId) -> String {
110        format!("{}/{flownode_id}/", FLOWNODE_FLOW_KEY_PREFIX)
111    }
112}
113
114impl<'a> MetadataKey<'a, FlownodeFlowKeyInner> for FlownodeFlowKeyInner {
115    fn to_bytes(&self) -> Vec<u8> {
116        format!(
117            "{FLOWNODE_FLOW_KEY_PREFIX}/{}/{}/{}",
118            self.flownode_id, self.flow_id, self.partition_id,
119        )
120        .into_bytes()
121    }
122
123    fn from_bytes(bytes: &'a [u8]) -> Result<FlownodeFlowKeyInner> {
124        let key = std::str::from_utf8(bytes).map_err(|e| {
125            error::InvalidMetadataSnafu {
126                err_msg: format!(
127                    "FlownodeFlowKeyInner '{}' is not a valid UTF8 string: {e}",
128                    String::from_utf8_lossy(bytes)
129                ),
130            }
131            .build()
132        })?;
133        let captures =
134            FLOWNODE_FLOW_KEY_PATTERN
135                .captures(key)
136                .context(error::InvalidMetadataSnafu {
137                    err_msg: format!("Invalid FlownodeFlowKeyInner '{key}'"),
138                })?;
139        // Safety: pass the regex check above
140        let flownode_id = captures[1].parse::<FlownodeId>().unwrap();
141        let flow_id = captures[2].parse::<FlowId>().unwrap();
142        let partition_id = captures[3].parse::<FlowPartitionId>().unwrap();
143
144        Ok(FlownodeFlowKeyInner {
145            flownode_id,
146            flow_id,
147            partition_id,
148        })
149    }
150}
151
152/// The manager of [FlownodeFlowKey].
153pub struct FlownodeFlowManager {
154    kv_backend: KvBackendRef,
155}
156
157/// Decodes `KeyValue` to [FlownodeFlowKey].
158pub fn flownode_flow_key_decoder(kv: KeyValue) -> Result<FlownodeFlowKey> {
159    FlownodeFlowKey::from_bytes(&kv.key)
160}
161
162impl FlownodeFlowManager {
163    /// Returns a new [FlownodeFlowManager].
164    pub fn new(kv_backend: KvBackendRef) -> Self {
165        Self { kv_backend }
166    }
167
168    /// Retrieves all [FlowId] and [FlowPartitionId]s of the specified `flownode_id`.
169    pub fn flows(
170        &self,
171        flownode_id: FlownodeId,
172    ) -> BoxStream<'static, Result<(FlowId, FlowPartitionId)>> {
173        let start_key = FlownodeFlowKey::range_start_key(flownode_id);
174        let req = RangeRequest::new().with_prefix(start_key);
175
176        let stream = PaginationStream::new(
177            self.kv_backend.clone(),
178            req,
179            DEFAULT_PAGE_SIZE,
180            flownode_flow_key_decoder,
181        )
182        .into_stream();
183
184        Box::pin(stream.map_ok(|key| (key.flow_id(), key.partition_id())))
185    }
186
187    /// Builds a create flownode flow transaction.
188    ///
189    /// Puts `__flownode_flow/{flownode_id}/{flow_id}/{partition_id}` keys.
190    pub(crate) fn build_create_txn<I: IntoIterator<Item = (FlowPartitionId, FlownodeId)>>(
191        &self,
192        flow_id: FlowId,
193        flownode_ids: I,
194    ) -> Txn {
195        let txns = flownode_ids
196            .into_iter()
197            .map(|(partition_id, flownode_id)| {
198                let key = FlownodeFlowKey::new(flownode_id, flow_id, partition_id).to_bytes();
199                TxnOp::Put(key, vec![])
200            })
201            .collect::<Vec<_>>();
202
203        Txn::new().and_then(txns)
204    }
205}
206
207#[cfg(test)]
208mod tests {
209    use crate::key::flow::flownode_flow::FlownodeFlowKey;
210    use crate::key::MetadataKey;
211
212    #[test]
213    fn test_key_serialization() {
214        let flownode_flow = FlownodeFlowKey::new(1, 2, 0);
215        assert_eq!(b"__flow/flownode/1/2/0".to_vec(), flownode_flow.to_bytes());
216        let prefix = FlownodeFlowKey::range_start_key(1);
217        assert_eq!(b"__flow/flownode/1/".to_vec(), prefix);
218    }
219
220    #[test]
221    fn test_key_deserialization() {
222        let bytes = b"__flow/flownode/1/2/0".to_vec();
223        let key = FlownodeFlowKey::from_bytes(&bytes).unwrap();
224        assert_eq!(key.flownode_id(), 1);
225        assert_eq!(key.flow_id(), 2);
226        assert_eq!(key.partition_id(), 0);
227    }
228}