common_meta/key/flow/
flow_state.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16use std::sync::Arc;
17
18use serde::{Deserialize, Serialize};
19
20use crate::error::{self, Result};
21use crate::key::flow::FlowScoped;
22use crate::key::{FlowId, MetadataKey, MetadataValue};
23use crate::kv_backend::KvBackendRef;
24use crate::rpc::store::PutRequest;
25
26/// The entire FlowId to Flow Size's Map is stored directly in the value part of the key.
27const FLOW_STATE_KEY: &str = "state";
28
29/// The key of flow state.
30#[derive(Debug, Clone, Copy, PartialEq)]
31struct FlowStateKeyInner;
32
33impl FlowStateKeyInner {
34    pub fn new() -> Self {
35        Self
36    }
37}
38
39impl<'a> MetadataKey<'a, FlowStateKeyInner> for FlowStateKeyInner {
40    fn to_bytes(&self) -> Vec<u8> {
41        FLOW_STATE_KEY.as_bytes().to_vec()
42    }
43
44    fn from_bytes(bytes: &'a [u8]) -> Result<FlowStateKeyInner> {
45        let key = std::str::from_utf8(bytes).map_err(|e| {
46            error::InvalidMetadataSnafu {
47                err_msg: format!(
48                    "FlowInfoKeyInner '{}' is not a valid UTF8 string: {e}",
49                    String::from_utf8_lossy(bytes)
50                ),
51            }
52            .build()
53        })?;
54        if key != FLOW_STATE_KEY {
55            return Err(error::InvalidMetadataSnafu {
56                err_msg: format!("Invalid FlowStateKeyInner '{key}'"),
57            }
58            .build());
59        }
60        Ok(FlowStateKeyInner::new())
61    }
62}
63
64/// The key stores the state size of the flow.
65///
66/// The layout: `__flow/state`.
67pub struct FlowStateKey(FlowScoped<FlowStateKeyInner>);
68
69impl FlowStateKey {
70    /// Returns the [FlowStateKey].
71    pub fn new() -> FlowStateKey {
72        let inner = FlowStateKeyInner::new();
73        FlowStateKey(FlowScoped::new(inner))
74    }
75}
76
77impl Default for FlowStateKey {
78    fn default() -> Self {
79        Self::new()
80    }
81}
82
83impl<'a> MetadataKey<'a, FlowStateKey> for FlowStateKey {
84    fn to_bytes(&self) -> Vec<u8> {
85        self.0.to_bytes()
86    }
87
88    fn from_bytes(bytes: &'a [u8]) -> Result<FlowStateKey> {
89        Ok(FlowStateKey(FlowScoped::<FlowStateKeyInner>::from_bytes(
90            bytes,
91        )?))
92    }
93}
94
95/// The value of flow state size
96#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
97pub struct FlowStateValue {
98    /// For each key, the bytes of the state in memory
99    pub state_size: BTreeMap<FlowId, usize>,
100    /// For each key, the last execution time of flow in unix timestamp milliseconds.
101    pub last_exec_time_map: BTreeMap<FlowId, i64>,
102}
103
104impl FlowStateValue {
105    pub fn new(
106        state_size: BTreeMap<FlowId, usize>,
107        last_exec_time_map: BTreeMap<FlowId, i64>,
108    ) -> Self {
109        Self {
110            state_size,
111            last_exec_time_map,
112        }
113    }
114}
115
116pub type FlowStateManagerRef = Arc<FlowStateManager>;
117
118/// The manager of [FlowStateKey]. Since state size changes frequently, we store it in memory.
119///
120/// This is only used in distributed mode. When meta-srv use heartbeat to update the flow stat report
121/// and frontned use get to get the latest flow stat report.
122pub struct FlowStateManager {
123    in_memory: KvBackendRef,
124}
125
126impl FlowStateManager {
127    pub fn new(in_memory: KvBackendRef) -> Self {
128        Self { in_memory }
129    }
130
131    pub async fn get(&self) -> Result<Option<FlowStateValue>> {
132        let key = FlowStateKey::new().to_bytes();
133        self.in_memory
134            .get(&key)
135            .await?
136            .map(|x| FlowStateValue::try_from_raw_value(&x.value))
137            .transpose()
138    }
139
140    pub async fn put(&self, value: FlowStateValue) -> Result<()> {
141        let key = FlowStateKey::new().to_bytes();
142        let value = value.try_as_raw_value()?;
143        let req = PutRequest::new().with_key(key).with_value(value);
144        self.in_memory.put(req).await?;
145        Ok(())
146    }
147}
148
149/// Flow's state report, send regularly through heartbeat message
150#[derive(Debug, Clone)]
151pub struct FlowStat {
152    /// For each key, the bytes of the state in memory
153    pub state_size: BTreeMap<u32, usize>,
154    /// For each key, the last execution time of flow in unix timestamp milliseconds.
155    pub last_exec_time_map: BTreeMap<FlowId, i64>,
156}
157
158impl From<FlowStateValue> for FlowStat {
159    fn from(value: FlowStateValue) -> Self {
160        Self {
161            state_size: value.state_size,
162            last_exec_time_map: value.last_exec_time_map,
163        }
164    }
165}
166
167impl From<FlowStat> for FlowStateValue {
168    fn from(value: FlowStat) -> Self {
169        Self {
170            state_size: value.state_size,
171            last_exec_time_map: value.last_exec_time_map,
172        }
173    }
174}