common_meta/key/flow/
flow_info.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16use std::sync::Arc;
17
18use chrono::{DateTime, Utc};
19use lazy_static::lazy_static;
20use regex::Regex;
21use serde::{Deserialize, Serialize};
22use snafu::OptionExt;
23use table::metadata::TableId;
24use table::table_name::TableName;
25
26use crate::error::{self, Result};
27use crate::key::flow::FlowScoped;
28use crate::key::txn_helper::TxnOpGetResponseSet;
29use crate::key::{DeserializedValueWithBytes, FlowId, FlowPartitionId, MetadataKey, MetadataValue};
30use crate::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp};
31use crate::kv_backend::KvBackendRef;
32use crate::FlownodeId;
33
34const FLOW_INFO_KEY_PREFIX: &str = "info";
35
36lazy_static! {
37    static ref FLOW_INFO_KEY_PATTERN: Regex =
38        Regex::new(&format!("^{FLOW_INFO_KEY_PREFIX}/([0-9]+)$")).unwrap();
39}
40
41/// The key stores the metadata of the flow.
42///
43/// The layout: `__flow/info/{flow_id}`.
44pub struct FlowInfoKey(FlowScoped<FlowInfoKeyInner>);
45
46pub type FlowInfoDecodeResult = Result<Option<DeserializedValueWithBytes<FlowInfoValue>>>;
47
48impl<'a> MetadataKey<'a, FlowInfoKey> for FlowInfoKey {
49    fn to_bytes(&self) -> Vec<u8> {
50        self.0.to_bytes()
51    }
52
53    fn from_bytes(bytes: &'a [u8]) -> Result<FlowInfoKey> {
54        Ok(FlowInfoKey(FlowScoped::<FlowInfoKeyInner>::from_bytes(
55            bytes,
56        )?))
57    }
58}
59
60impl FlowInfoKey {
61    /// Returns the [FlowInfoKey].
62    pub fn new(flow_id: FlowId) -> FlowInfoKey {
63        let inner = FlowInfoKeyInner::new(flow_id);
64        FlowInfoKey(FlowScoped::new(inner))
65    }
66
67    /// Returns the [FlowId].
68    pub fn flow_id(&self) -> FlowId {
69        self.0.flow_id
70    }
71}
72
73/// The key of flow metadata.
74#[derive(Debug, Clone, Copy, PartialEq)]
75struct FlowInfoKeyInner {
76    flow_id: FlowId,
77}
78
79impl FlowInfoKeyInner {
80    /// Returns a [FlowInfoKey] with the specified `flow_id`.
81    pub fn new(flow_id: FlowId) -> FlowInfoKeyInner {
82        FlowInfoKeyInner { flow_id }
83    }
84}
85
86impl<'a> MetadataKey<'a, FlowInfoKeyInner> for FlowInfoKeyInner {
87    fn to_bytes(&self) -> Vec<u8> {
88        format!("{FLOW_INFO_KEY_PREFIX}/{}", self.flow_id).into_bytes()
89    }
90
91    fn from_bytes(bytes: &'a [u8]) -> Result<FlowInfoKeyInner> {
92        let key = std::str::from_utf8(bytes).map_err(|e| {
93            error::InvalidMetadataSnafu {
94                err_msg: format!(
95                    "FlowInfoKeyInner '{}' is not a valid UTF8 string: {e}",
96                    String::from_utf8_lossy(bytes)
97                ),
98            }
99            .build()
100        })?;
101        let captures =
102            FLOW_INFO_KEY_PATTERN
103                .captures(key)
104                .context(error::InvalidMetadataSnafu {
105                    err_msg: format!("Invalid FlowInfoKeyInner '{key}'"),
106                })?;
107        // Safety: pass the regex check above
108        let flow_id = captures[1].parse::<FlowId>().unwrap();
109        Ok(FlowInfoKeyInner { flow_id })
110    }
111}
112
113// The metadata of the flow.
114#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
115pub struct FlowInfoValue {
116    /// The source tables used by the flow.
117    pub source_table_ids: Vec<TableId>,
118    /// The sink table used by the flow.
119    pub sink_table_name: TableName,
120    /// Which flow nodes this flow is running on.
121    pub flownode_ids: BTreeMap<FlowPartitionId, FlownodeId>,
122    /// The catalog name.
123    pub catalog_name: String,
124    /// The query context used when create flow.
125    /// Although flow doesn't belong to any schema, this query_context is needed to remember
126    /// the query context when `create_flow` is executed
127    /// for recovering flow using the same sql&query_context after db restart.
128    /// if none, should use default query context
129    #[serde(default)]
130    pub query_context: Option<crate::rpc::ddl::QueryContext>,
131    /// The flow name.
132    pub flow_name: String,
133    /// The raw sql.
134    pub raw_sql: String,
135    /// The expr of expire.
136    /// Duration in seconds as `i64`.
137    pub expire_after: Option<i64>,
138    /// The comment.
139    pub comment: String,
140    /// The options.
141    pub options: HashMap<String, String>,
142    /// The created time
143    #[serde(default)]
144    pub created_time: DateTime<Utc>,
145    /// The updated time.
146    #[serde(default)]
147    pub updated_time: DateTime<Utc>,
148}
149
150impl FlowInfoValue {
151    /// Returns the `flownode_id`.
152    pub fn flownode_ids(&self) -> &BTreeMap<FlowPartitionId, FlownodeId> {
153        &self.flownode_ids
154    }
155
156    /// Insert a new flownode id for a partition.
157    pub fn insert_flownode_id(
158        &mut self,
159        partition: FlowPartitionId,
160        node: FlownodeId,
161    ) -> Option<FlownodeId> {
162        self.flownode_ids.insert(partition, node)
163    }
164
165    /// Returns the `source_table`.
166    pub fn source_table_ids(&self) -> &[TableId] {
167        &self.source_table_ids
168    }
169
170    pub fn catalog_name(&self) -> &String {
171        &self.catalog_name
172    }
173
174    pub fn query_context(&self) -> &Option<crate::rpc::ddl::QueryContext> {
175        &self.query_context
176    }
177
178    pub fn flow_name(&self) -> &String {
179        &self.flow_name
180    }
181
182    pub fn sink_table_name(&self) -> &TableName {
183        &self.sink_table_name
184    }
185
186    pub fn raw_sql(&self) -> &String {
187        &self.raw_sql
188    }
189
190    pub fn expire_after(&self) -> Option<i64> {
191        self.expire_after
192    }
193
194    pub fn comment(&self) -> &String {
195        &self.comment
196    }
197
198    pub fn options(&self) -> &HashMap<String, String> {
199        &self.options
200    }
201
202    pub fn created_time(&self) -> &DateTime<Utc> {
203        &self.created_time
204    }
205
206    pub fn updated_time(&self) -> &DateTime<Utc> {
207        &self.updated_time
208    }
209}
210
211pub type FlowInfoManagerRef = Arc<FlowInfoManager>;
212
213/// The manager of [FlowInfoKey].
214pub struct FlowInfoManager {
215    kv_backend: KvBackendRef,
216}
217
218impl FlowInfoManager {
219    /// Returns a new [FlowInfoManager].
220    pub fn new(kv_backend: KvBackendRef) -> Self {
221        Self { kv_backend }
222    }
223
224    /// Returns the [FlowInfoValue] of specified `flow_id`.
225    pub async fn get(&self, flow_id: FlowId) -> Result<Option<FlowInfoValue>> {
226        let key = FlowInfoKey::new(flow_id).to_bytes();
227        self.kv_backend
228            .get(&key)
229            .await?
230            .map(|x| FlowInfoValue::try_from_raw_value(&x.value))
231            .transpose()
232    }
233
234    /// Returns the [FlowInfoValue] with original bytes of specified `flow_id`.
235    pub async fn get_raw(
236        &self,
237        flow_id: FlowId,
238    ) -> Result<Option<DeserializedValueWithBytes<FlowInfoValue>>> {
239        let key = FlowInfoKey::new(flow_id).to_bytes();
240        self.kv_backend
241            .get(&key)
242            .await?
243            .map(|x| DeserializedValueWithBytes::from_inner_slice(&x.value))
244            .transpose()
245    }
246
247    /// Builds a create flow transaction.
248    /// It is expected that the `__flow/info/{flow_id}` wasn't occupied.
249    /// Otherwise, the transaction will retrieve existing value.
250    pub(crate) fn build_create_txn(
251        &self,
252        flow_id: FlowId,
253        flow_value: &FlowInfoValue,
254    ) -> Result<(
255        Txn,
256        impl FnOnce(&mut TxnOpGetResponseSet) -> FlowInfoDecodeResult,
257    )> {
258        let key = FlowInfoKey::new(flow_id).to_bytes();
259        let txn = Txn::put_if_not_exists(key.clone(), flow_value.try_as_raw_value()?);
260
261        Ok((
262            txn,
263            TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(key)),
264        ))
265    }
266
267    /// Builds a update flow transaction.
268    /// It is expected that the `__flow/info/{flow_id}` IS ALREADY occupied and equal to `prev_flow_value`,
269    /// but the new value can be the same, so to allow replace operation to happen even when the value is the same.
270    /// Otherwise, the transaction will retrieve existing value and fail.
271    pub(crate) fn build_update_txn(
272        &self,
273        flow_id: FlowId,
274        current_flow_value: &DeserializedValueWithBytes<FlowInfoValue>,
275        new_flow_value: &FlowInfoValue,
276    ) -> Result<(
277        Txn,
278        impl FnOnce(&mut TxnOpGetResponseSet) -> FlowInfoDecodeResult,
279    )> {
280        let key = FlowInfoKey::new(flow_id).to_bytes();
281        let raw_value = new_flow_value.try_as_raw_value()?;
282        let prev_value = current_flow_value.get_raw_bytes();
283        let txn = Txn::new()
284            .when(vec![Compare::new(
285                key.clone(),
286                CompareOp::Equal,
287                Some(prev_value),
288            )])
289            .and_then(vec![TxnOp::Put(key.clone(), raw_value)])
290            .or_else(vec![TxnOp::Get(key.clone())]);
291
292        Ok((
293            txn,
294            TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(key)),
295        ))
296    }
297}
298
299#[cfg(test)]
300mod tests {
301    use super::*;
302
303    #[test]
304    fn test_key_serialization() {
305        let flow_info = FlowInfoKey::new(2);
306        assert_eq!(b"__flow/info/2".to_vec(), flow_info.to_bytes());
307    }
308
309    #[test]
310    fn test_key_deserialization() {
311        let bytes = b"__flow/info/2".to_vec();
312        let key = FlowInfoKey::from_bytes(&bytes).unwrap();
313        assert_eq!(key.flow_id(), 2);
314    }
315}