common_meta/key/flow/
flow_info.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16use std::sync::Arc;
17
18use chrono::{DateTime, Utc};
19use lazy_static::lazy_static;
20use regex::Regex;
21use serde::{Deserialize, Serialize};
22use snafu::OptionExt;
23use table::metadata::TableId;
24use table::table_name::TableName;
25
26use crate::error::{self, Result};
27use crate::key::flow::FlowScoped;
28use crate::key::txn_helper::TxnOpGetResponseSet;
29use crate::key::{DeserializedValueWithBytes, FlowId, FlowPartitionId, MetadataKey, MetadataValue};
30use crate::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp};
31use crate::kv_backend::KvBackendRef;
32use crate::FlownodeId;
33
34const FLOW_INFO_KEY_PREFIX: &str = "info";
35
36lazy_static! {
37    static ref FLOW_INFO_KEY_PATTERN: Regex =
38        Regex::new(&format!("^{FLOW_INFO_KEY_PREFIX}/([0-9]+)$")).unwrap();
39}
40
41/// The key stores the metadata of the flow.
42///
43/// The layout: `__flow/info/{flow_id}`.
44pub struct FlowInfoKey(FlowScoped<FlowInfoKeyInner>);
45
46pub type FlowInfoDecodeResult = Result<Option<DeserializedValueWithBytes<FlowInfoValue>>>;
47
48impl<'a> MetadataKey<'a, FlowInfoKey> for FlowInfoKey {
49    fn to_bytes(&self) -> Vec<u8> {
50        self.0.to_bytes()
51    }
52
53    fn from_bytes(bytes: &'a [u8]) -> Result<FlowInfoKey> {
54        Ok(FlowInfoKey(FlowScoped::<FlowInfoKeyInner>::from_bytes(
55            bytes,
56        )?))
57    }
58}
59
60impl FlowInfoKey {
61    /// Returns the [FlowInfoKey].
62    pub fn new(flow_id: FlowId) -> FlowInfoKey {
63        let inner = FlowInfoKeyInner::new(flow_id);
64        FlowInfoKey(FlowScoped::new(inner))
65    }
66
67    /// Returns the [FlowId].
68    pub fn flow_id(&self) -> FlowId {
69        self.0.flow_id
70    }
71}
72
73/// The key of flow metadata.
74#[derive(Debug, Clone, Copy, PartialEq)]
75struct FlowInfoKeyInner {
76    flow_id: FlowId,
77}
78
79impl FlowInfoKeyInner {
80    /// Returns a [FlowInfoKey] with the specified `flow_id`.
81    pub fn new(flow_id: FlowId) -> FlowInfoKeyInner {
82        FlowInfoKeyInner { flow_id }
83    }
84}
85
86impl<'a> MetadataKey<'a, FlowInfoKeyInner> for FlowInfoKeyInner {
87    fn to_bytes(&self) -> Vec<u8> {
88        format!("{FLOW_INFO_KEY_PREFIX}/{}", self.flow_id).into_bytes()
89    }
90
91    fn from_bytes(bytes: &'a [u8]) -> Result<FlowInfoKeyInner> {
92        let key = std::str::from_utf8(bytes).map_err(|e| {
93            error::InvalidMetadataSnafu {
94                err_msg: format!(
95                    "FlowInfoKeyInner '{}' is not a valid UTF8 string: {e}",
96                    String::from_utf8_lossy(bytes)
97                ),
98            }
99            .build()
100        })?;
101        let captures =
102            FLOW_INFO_KEY_PATTERN
103                .captures(key)
104                .context(error::InvalidMetadataSnafu {
105                    err_msg: format!("Invalid FlowInfoKeyInner '{key}'"),
106                })?;
107        // Safety: pass the regex check above
108        let flow_id = captures[1].parse::<FlowId>().unwrap();
109        Ok(FlowInfoKeyInner { flow_id })
110    }
111}
112
113// The metadata of the flow.
114#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
115pub struct FlowInfoValue {
116    /// The source tables used by the flow.
117    pub(crate) source_table_ids: Vec<TableId>,
118    /// The sink table used by the flow.
119    pub(crate) sink_table_name: TableName,
120    /// Which flow nodes this flow is running on.
121    pub(crate) flownode_ids: BTreeMap<FlowPartitionId, FlownodeId>,
122    /// The catalog name.
123    pub(crate) catalog_name: String,
124    /// The query context used when create flow.
125    /// Although flow doesn't belong to any schema, this query_context is needed to remember
126    /// the query context when `create_flow` is executed
127    /// for recovering flow using the same sql&query_context after db restart.
128    /// if none, should use default query context
129    #[serde(default)]
130    pub(crate) query_context: Option<crate::rpc::ddl::QueryContext>,
131    /// The flow name.
132    pub(crate) flow_name: String,
133    /// The raw sql.
134    pub(crate) raw_sql: String,
135    /// The expr of expire.
136    /// Duration in seconds as `i64`.
137    pub(crate) expire_after: Option<i64>,
138    /// The comment.
139    pub(crate) comment: String,
140    /// The options.
141    pub(crate) options: HashMap<String, String>,
142    /// The created time
143    #[serde(default)]
144    pub(crate) created_time: DateTime<Utc>,
145    /// The updated time.
146    #[serde(default)]
147    pub(crate) updated_time: DateTime<Utc>,
148}
149
150impl FlowInfoValue {
151    /// Returns the `flownode_id`.
152    pub fn flownode_ids(&self) -> &BTreeMap<FlowPartitionId, FlownodeId> {
153        &self.flownode_ids
154    }
155
156    /// Returns the `source_table`.
157    pub fn source_table_ids(&self) -> &[TableId] {
158        &self.source_table_ids
159    }
160
161    pub fn catalog_name(&self) -> &String {
162        &self.catalog_name
163    }
164
165    pub fn query_context(&self) -> &Option<crate::rpc::ddl::QueryContext> {
166        &self.query_context
167    }
168
169    pub fn flow_name(&self) -> &String {
170        &self.flow_name
171    }
172
173    pub fn sink_table_name(&self) -> &TableName {
174        &self.sink_table_name
175    }
176
177    pub fn raw_sql(&self) -> &String {
178        &self.raw_sql
179    }
180
181    pub fn expire_after(&self) -> Option<i64> {
182        self.expire_after
183    }
184
185    pub fn comment(&self) -> &String {
186        &self.comment
187    }
188
189    pub fn options(&self) -> &HashMap<String, String> {
190        &self.options
191    }
192
193    pub fn created_time(&self) -> &DateTime<Utc> {
194        &self.created_time
195    }
196
197    pub fn updated_time(&self) -> &DateTime<Utc> {
198        &self.updated_time
199    }
200}
201
202pub type FlowInfoManagerRef = Arc<FlowInfoManager>;
203
204/// The manager of [FlowInfoKey].
205pub struct FlowInfoManager {
206    kv_backend: KvBackendRef,
207}
208
209impl FlowInfoManager {
210    /// Returns a new [FlowInfoManager].
211    pub fn new(kv_backend: KvBackendRef) -> Self {
212        Self { kv_backend }
213    }
214
215    /// Returns the [FlowInfoValue] of specified `flow_id`.
216    pub async fn get(&self, flow_id: FlowId) -> Result<Option<FlowInfoValue>> {
217        let key = FlowInfoKey::new(flow_id).to_bytes();
218        self.kv_backend
219            .get(&key)
220            .await?
221            .map(|x| FlowInfoValue::try_from_raw_value(&x.value))
222            .transpose()
223    }
224
225    /// Returns the [FlowInfoValue] with original bytes of specified `flow_id`.
226    pub async fn get_raw(
227        &self,
228        flow_id: FlowId,
229    ) -> Result<Option<DeserializedValueWithBytes<FlowInfoValue>>> {
230        let key = FlowInfoKey::new(flow_id).to_bytes();
231        self.kv_backend
232            .get(&key)
233            .await?
234            .map(|x| DeserializedValueWithBytes::from_inner_slice(&x.value))
235            .transpose()
236    }
237
238    /// Builds a create flow transaction.
239    /// It is expected that the `__flow/info/{flow_id}` wasn't occupied.
240    /// Otherwise, the transaction will retrieve existing value.
241    pub(crate) fn build_create_txn(
242        &self,
243        flow_id: FlowId,
244        flow_value: &FlowInfoValue,
245    ) -> Result<(
246        Txn,
247        impl FnOnce(&mut TxnOpGetResponseSet) -> FlowInfoDecodeResult,
248    )> {
249        let key = FlowInfoKey::new(flow_id).to_bytes();
250        let txn = Txn::put_if_not_exists(key.clone(), flow_value.try_as_raw_value()?);
251
252        Ok((
253            txn,
254            TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(key)),
255        ))
256    }
257
258    /// Builds a update flow transaction.
259    /// It is expected that the `__flow/info/{flow_id}` IS ALREADY occupied and equal to `prev_flow_value`,
260    /// but the new value can be the same, so to allow replace operation to happen even when the value is the same.
261    /// Otherwise, the transaction will retrieve existing value and fail.
262    pub(crate) fn build_update_txn(
263        &self,
264        flow_id: FlowId,
265        current_flow_value: &DeserializedValueWithBytes<FlowInfoValue>,
266        new_flow_value: &FlowInfoValue,
267    ) -> Result<(
268        Txn,
269        impl FnOnce(&mut TxnOpGetResponseSet) -> FlowInfoDecodeResult,
270    )> {
271        let key = FlowInfoKey::new(flow_id).to_bytes();
272        let raw_value = new_flow_value.try_as_raw_value()?;
273        let prev_value = current_flow_value.get_raw_bytes();
274        let txn = Txn::new()
275            .when(vec![
276                Compare::new(key.clone(), CompareOp::NotEqual, None),
277                Compare::new(key.clone(), CompareOp::Equal, Some(prev_value)),
278            ])
279            .and_then(vec![TxnOp::Put(key.clone(), raw_value)])
280            .or_else(vec![TxnOp::Get(key.clone())]);
281
282        Ok((
283            txn,
284            TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(key)),
285        ))
286    }
287}
288
289#[cfg(test)]
290mod tests {
291    use super::*;
292
293    #[test]
294    fn test_key_serialization() {
295        let flow_info = FlowInfoKey::new(2);
296        assert_eq!(b"__flow/info/2".to_vec(), flow_info.to_bytes());
297    }
298
299    #[test]
300    fn test_key_deserialization() {
301        let bytes = b"__flow/info/2".to_vec();
302        let key = FlowInfoKey::from_bytes(&bytes).unwrap();
303        assert_eq!(key.flow_id(), 2);
304    }
305}