common_meta/key/flow/
flow_name.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use common_telemetry::warn;
16use futures::stream::BoxStream;
17use lazy_static::lazy_static;
18use regex::Regex;
19use serde::{Deserialize, Serialize};
20use snafu::OptionExt;
21
22use crate::error::{self, Result};
23use crate::key::flow::FlowScoped;
24use crate::key::txn_helper::TxnOpGetResponseSet;
25use crate::key::{
26    BytesAdapter, DeserializedValueWithBytes, FlowId, MetadataKey, MetadataValue, NAME_PATTERN,
27};
28use crate::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp};
29use crate::kv_backend::KvBackendRef;
30use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE};
31use crate::rpc::store::RangeRequest;
32use crate::rpc::KeyValue;
33
34const FLOW_NAME_KEY_PREFIX: &str = "name";
35
36lazy_static! {
37    static ref FLOW_NAME_KEY_PATTERN: Regex = Regex::new(&format!(
38        "^{FLOW_NAME_KEY_PREFIX}/({NAME_PATTERN})/({NAME_PATTERN})$"
39    ))
40    .unwrap();
41
42    /// for compatibility with older flow name with less strict name pattern
43    static ref COMPAT_FLOW_NAME_KEY_PATTERN: Regex = Regex::new(&format!(
44        "^{FLOW_NAME_KEY_PREFIX}/({NAME_PATTERN})/(.*)$"
45    ))
46    .unwrap();
47}
48
49/// The key of mapping {flow_name} to [FlowId].
50///
51/// The layout: `__flow/name/{catalog_name}/{flow_name}`.
52pub struct FlowNameKey<'a>(FlowScoped<FlowNameKeyInner<'a>>);
53
54pub type FlowNameDecodeResult = Result<Option<DeserializedValueWithBytes<FlowNameValue>>>;
55
56#[allow(dead_code)]
57impl<'a> FlowNameKey<'a> {
58    /// Returns the [FlowNameKey]
59    pub fn new(catalog: &'a str, flow_name: &'a str) -> FlowNameKey<'a> {
60        let inner = FlowNameKeyInner::new(catalog, flow_name);
61        FlowNameKey(FlowScoped::new(inner))
62    }
63
64    pub fn range_start_key(catalog: &str) -> Vec<u8> {
65        let inner = BytesAdapter::from(Self::prefix(catalog).into_bytes());
66
67        FlowScoped::new(inner).to_bytes()
68    }
69
70    /// Return `name/{catalog}/` as prefix
71    pub fn prefix(catalog: &str) -> String {
72        format!("{}/{}/", FLOW_NAME_KEY_PREFIX, catalog)
73    }
74
75    /// Returns the catalog.
76    pub fn catalog(&self) -> &str {
77        self.0.catalog_name
78    }
79
80    /// Return the `flow_name`
81    pub fn flow_name(&self) -> &str {
82        self.0.flow_name
83    }
84}
85
86impl<'a> MetadataKey<'a, FlowNameKey<'a>> for FlowNameKey<'a> {
87    fn to_bytes(&self) -> Vec<u8> {
88        self.0.to_bytes()
89    }
90
91    fn from_bytes(bytes: &'a [u8]) -> Result<FlowNameKey<'a>> {
92        Ok(FlowNameKey(FlowScoped::<FlowNameKeyInner>::from_bytes(
93            bytes,
94        )?))
95    }
96}
97
98/// The key of mapping name to [FlowId]
99#[derive(Debug, Clone, PartialEq, Eq)]
100pub struct FlowNameKeyInner<'a> {
101    pub catalog_name: &'a str,
102    pub flow_name: &'a str,
103}
104
105impl<'a> MetadataKey<'a, FlowNameKeyInner<'a>> for FlowNameKeyInner<'_> {
106    fn to_bytes(&self) -> Vec<u8> {
107        format!(
108            "{FLOW_NAME_KEY_PREFIX}/{}/{}",
109            self.catalog_name, self.flow_name
110        )
111        .into_bytes()
112    }
113
114    fn from_bytes(bytes: &'a [u8]) -> Result<FlowNameKeyInner<'a>> {
115        let key = std::str::from_utf8(bytes).map_err(|e| {
116            error::InvalidMetadataSnafu {
117                err_msg: format!(
118                    "FlowNameKeyInner '{}' is not a valid UTF8 string: {e}",
119                    String::from_utf8_lossy(bytes)
120                ),
121            }
122            .build()
123        })?;
124        let captures = FLOW_NAME_KEY_PATTERN
125            .captures(key)
126            .or_else(|| {
127                warn!(
128                    "FlowNameKeyInner '{}' is not a valid flow name in newer version.",
129                    key
130                );
131                COMPAT_FLOW_NAME_KEY_PATTERN.captures(key)
132            })
133            .context(error::InvalidMetadataSnafu {
134                err_msg: format!("Invalid FlowNameKeyInner '{key}'"),
135            })?;
136        // Safety: pass the regex check above
137        let catalog_name = captures.get(1).unwrap().as_str();
138        let flow_name = captures.get(2).unwrap().as_str();
139        Ok(FlowNameKeyInner {
140            catalog_name,
141            flow_name,
142        })
143    }
144}
145
146impl<'a> FlowNameKeyInner<'a> {
147    /// Returns a [FlowNameKeyInner].
148    pub fn new(catalog_name: &'a str, flow_name: &'a str) -> Self {
149        Self {
150            catalog_name,
151            flow_name,
152        }
153    }
154}
155
156/// The value of [FlowNameKey].
157#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
158pub struct FlowNameValue {
159    flow_id: FlowId,
160}
161
162impl FlowNameValue {
163    /// Returns a [FlowNameValue] with specified [FlowId].
164    pub fn new(flow_id: FlowId) -> Self {
165        Self { flow_id }
166    }
167
168    /// Returns the [FlowId]
169    pub fn flow_id(&self) -> FlowId {
170        self.flow_id
171    }
172}
173
174pub fn flow_name_decoder(kv: KeyValue) -> Result<(String, FlowNameValue)> {
175    let flow_name = FlowNameKey::from_bytes(&kv.key)?;
176    let flow_id = FlowNameValue::try_from_raw_value(&kv.value)?;
177    Ok((flow_name.flow_name().to_string(), flow_id))
178}
179
180/// The manager of [FlowNameKey].
181pub struct FlowNameManager {
182    kv_backend: KvBackendRef,
183}
184
185impl FlowNameManager {
186    /// Returns a new [FlowNameManager].
187    pub fn new(kv_backend: KvBackendRef) -> Self {
188        Self { kv_backend }
189    }
190
191    /// Returns the [FlowNameValue] of specified `catalog.flow`.
192    pub async fn get(&self, catalog: &str, flow: &str) -> Result<Option<FlowNameValue>> {
193        let key = FlowNameKey::new(catalog, flow);
194        let raw_key = key.to_bytes();
195        self.kv_backend
196            .get(&raw_key)
197            .await?
198            .map(|x| FlowNameValue::try_from_raw_value(&x.value))
199            .transpose()
200    }
201
202    /// Return all flows' names and ids
203    pub async fn flow_names(
204        &self,
205        catalog: &str,
206    ) -> BoxStream<'static, Result<(String, FlowNameValue)>> {
207        let start_key = FlowNameKey::range_start_key(catalog);
208        common_telemetry::debug!("flow_names: start_key: {:?}", start_key);
209        let req = RangeRequest::new().with_prefix(start_key);
210
211        let stream = PaginationStream::new(
212            self.kv_backend.clone(),
213            req,
214            DEFAULT_PAGE_SIZE,
215            flow_name_decoder,
216        )
217        .into_stream();
218
219        Box::pin(stream)
220    }
221
222    /// Returns true if the `flow` exists.
223    pub async fn exists(&self, catalog: &str, flow: &str) -> Result<bool> {
224        let key = FlowNameKey::new(catalog, flow);
225        let raw_key = key.to_bytes();
226        self.kv_backend.exists(&raw_key).await
227    }
228
229    /// Builds a create flow name transaction.
230    /// It's expected that the `__flow/name/{catalog}/{flow_name}` wasn't occupied.
231    /// Otherwise, the transaction will retrieve existing value.
232    pub fn build_create_txn(
233        &self,
234        catalog_name: &str,
235        flow_name: &str,
236        flow_id: FlowId,
237    ) -> Result<(
238        Txn,
239        impl FnOnce(&mut TxnOpGetResponseSet) -> FlowNameDecodeResult,
240    )> {
241        let key = FlowNameKey::new(catalog_name, flow_name);
242        let raw_key = key.to_bytes();
243        let flow_flow_name_value = FlowNameValue::new(flow_id);
244        let txn = Txn::put_if_not_exists(raw_key.clone(), flow_flow_name_value.try_as_raw_value()?);
245
246        Ok((
247            txn,
248            TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(raw_key)),
249        ))
250    }
251
252    /// Builds a update flow name transaction. Which doesn't change either the name or id, just checking if they are the same.
253    /// It's expected that the `__flow/name/{catalog}/{flow_name}` IS already occupied,
254    /// and both flow name and flow id is the same.
255    /// Otherwise, the transaction will retrieve existing value(and fail).
256    pub fn build_update_txn(
257        &self,
258        catalog_name: &str,
259        flow_name: &str,
260        flow_id: FlowId,
261    ) -> Result<(
262        Txn,
263        impl FnOnce(&mut TxnOpGetResponseSet) -> FlowNameDecodeResult,
264    )> {
265        let key = FlowNameKey::new(catalog_name, flow_name);
266        let raw_key = key.to_bytes();
267        let flow_flow_name_value = FlowNameValue::new(flow_id);
268        let raw_value = flow_flow_name_value.try_as_raw_value()?;
269        let txn = Txn::new()
270            .when(vec![Compare::new(
271                raw_key.clone(),
272                CompareOp::Equal,
273                Some(raw_value),
274            )])
275            .or_else(vec![TxnOp::Get(raw_key.clone())]);
276
277        Ok((
278            txn,
279            TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(raw_key)),
280        ))
281    }
282}
283
284#[cfg(test)]
285mod tests {
286    use super::*;
287
288    #[test]
289    fn test_key_serialization() {
290        let key = FlowNameKey::new("my_catalog", "my_task");
291        assert_eq!(b"__flow/name/my_catalog/my_task".to_vec(), key.to_bytes(),);
292    }
293
294    #[test]
295    fn test_key_deserialization() {
296        let bytes = b"__flow/name/my_catalog/my_task".to_vec();
297        let key = FlowNameKey::from_bytes(&bytes).unwrap();
298        assert_eq!(key.catalog(), "my_catalog");
299        assert_eq!(key.flow_name(), "my_task");
300
301        // compatibility with older version
302        let bytes = b"__flow/name/my_catalog/a/`b`".to_vec();
303        let key = FlowNameKey::from_bytes(&bytes).unwrap();
304        assert_eq!(key.catalog(), "my_catalog");
305        assert_eq!(key.flow_name(), "a/`b`");
306    }
307    #[test]
308    fn test_key_start_range() {
309        assert_eq!(
310            b"__flow/name/greptime/".to_vec(),
311            FlowNameKey::range_start_key("greptime")
312        );
313    }
314}