common_meta/key/flow/
flow_state.rs1use std::collections::BTreeMap;
16use std::sync::Arc;
17
18use serde::{Deserialize, Serialize};
19
20use crate::error::{self, Result};
21use crate::key::flow::FlowScoped;
22use crate::key::{FlowId, MetadataKey, MetadataValue};
23use crate::kv_backend::KvBackendRef;
24use crate::rpc::store::PutRequest;
25
26const FLOW_STATE_KEY: &str = "state";
28
29#[derive(Debug, Clone, Copy, PartialEq)]
31struct FlowStateKeyInner;
32
33impl FlowStateKeyInner {
34 pub fn new() -> Self {
35 Self
36 }
37}
38
39impl<'a> MetadataKey<'a, FlowStateKeyInner> for FlowStateKeyInner {
40 fn to_bytes(&self) -> Vec<u8> {
41 FLOW_STATE_KEY.as_bytes().to_vec()
42 }
43
44 fn from_bytes(bytes: &'a [u8]) -> Result<FlowStateKeyInner> {
45 let key = std::str::from_utf8(bytes).map_err(|e| {
46 error::InvalidMetadataSnafu {
47 err_msg: format!(
48 "FlowInfoKeyInner '{}' is not a valid UTF8 string: {e}",
49 String::from_utf8_lossy(bytes)
50 ),
51 }
52 .build()
53 })?;
54 if key != FLOW_STATE_KEY {
55 return Err(error::InvalidMetadataSnafu {
56 err_msg: format!("Invalid FlowStateKeyInner '{key}'"),
57 }
58 .build());
59 }
60 Ok(FlowStateKeyInner::new())
61 }
62}
63
64pub struct FlowStateKey(FlowScoped<FlowStateKeyInner>);
68
69impl FlowStateKey {
70 pub fn new() -> FlowStateKey {
72 let inner = FlowStateKeyInner::new();
73 FlowStateKey(FlowScoped::new(inner))
74 }
75}
76
77impl Default for FlowStateKey {
78 fn default() -> Self {
79 Self::new()
80 }
81}
82
83impl<'a> MetadataKey<'a, FlowStateKey> for FlowStateKey {
84 fn to_bytes(&self) -> Vec<u8> {
85 self.0.to_bytes()
86 }
87
88 fn from_bytes(bytes: &'a [u8]) -> Result<FlowStateKey> {
89 Ok(FlowStateKey(FlowScoped::<FlowStateKeyInner>::from_bytes(
90 bytes,
91 )?))
92 }
93}
94
95#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
97pub struct FlowStateValue {
98 pub state_size: BTreeMap<FlowId, usize>,
100 pub last_exec_time_map: BTreeMap<FlowId, i64>,
102}
103
104impl FlowStateValue {
105 pub fn new(
106 state_size: BTreeMap<FlowId, usize>,
107 last_exec_time_map: BTreeMap<FlowId, i64>,
108 ) -> Self {
109 Self {
110 state_size,
111 last_exec_time_map,
112 }
113 }
114}
115
116pub type FlowStateManagerRef = Arc<FlowStateManager>;
117
118pub struct FlowStateManager {
123 in_memory: KvBackendRef,
124}
125
126impl FlowStateManager {
127 pub fn new(in_memory: KvBackendRef) -> Self {
128 Self { in_memory }
129 }
130
131 pub async fn get(&self) -> Result<Option<FlowStateValue>> {
132 let key = FlowStateKey::new().to_bytes();
133 self.in_memory
134 .get(&key)
135 .await?
136 .map(|x| FlowStateValue::try_from_raw_value(&x.value))
137 .transpose()
138 }
139
140 pub async fn put(&self, value: FlowStateValue) -> Result<()> {
141 let key = FlowStateKey::new().to_bytes();
142 let value = value.try_as_raw_value()?;
143 let req = PutRequest::new().with_key(key).with_value(value);
144 self.in_memory.put(req).await?;
145 Ok(())
146 }
147}
148
149#[derive(Debug, Clone)]
151pub struct FlowStat {
152 pub state_size: BTreeMap<u32, usize>,
154 pub last_exec_time_map: BTreeMap<FlowId, i64>,
156}
157
158impl From<FlowStateValue> for FlowStat {
159 fn from(value: FlowStateValue) -> Self {
160 Self {
161 state_size: value.state_size,
162 last_exec_time_map: value.last_exec_time_map,
163 }
164 }
165}
166
167impl From<FlowStat> for FlowStateValue {
168 fn from(value: FlowStat) -> Self {
169 Self {
170 state_size: value.state_size,
171 last_exec_time_map: value.last_exec_time_map,
172 }
173 }
174}