1use std::collections::HashMap;
16use std::fmt::{Display, Formatter};
17use std::time::Duration;
18
19use serde::{Deserialize, Serialize};
20use store_api::storage::{RegionId, RegionNumber};
21use strum::Display;
22use table::metadata::TableId;
23use table::table_name::TableName;
24
25use crate::flow_name::FlowName;
26use crate::key::schema_name::SchemaName;
27use crate::key::{FlowId, FlowPartitionId};
28use crate::peer::Peer;
29use crate::{DatanodeId, FlownodeId};
30
31#[derive(Eq, Hash, PartialEq, Clone, Debug, Serialize, Deserialize)]
32pub struct RegionIdent {
33 pub datanode_id: DatanodeId,
34 pub table_id: TableId,
35 pub region_number: RegionNumber,
36 pub engine: String,
37}
38
39impl RegionIdent {
40 pub fn get_region_id(&self) -> RegionId {
41 RegionId::new(self.table_id, self.region_number)
42 }
43}
44
45impl Display for RegionIdent {
46 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
47 write!(
48 f,
49 "RegionIdent(datanode_id='{}', table_id={}, region_number={}, engine = {})",
50 self.datanode_id, self.table_id, self.region_number, self.engine
51 )
52 }
53}
54
55#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
57pub struct DowngradeRegionReply {
58 pub last_entry_id: Option<u64>,
60 pub metadata_last_entry_id: Option<u64>,
62 pub exists: bool,
64 pub error: Option<String>,
66}
67
68impl Display for DowngradeRegionReply {
69 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
70 write!(
71 f,
72 "(last_entry_id={:?}, exists={}, error={:?})",
73 self.last_entry_id, self.exists, self.error
74 )
75 }
76}
77
78#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
79pub struct SimpleReply {
80 pub result: bool,
81 pub error: Option<String>,
82}
83
84impl Display for SimpleReply {
85 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
86 write!(f, "(result={}, error={:?})", self.result, self.error)
87 }
88}
89
90impl Display for OpenRegion {
91 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
92 write!(
93 f,
94 "OpenRegion(region_ident={}, region_storage_path={})",
95 self.region_ident, self.region_storage_path
96 )
97 }
98}
99
100#[serde_with::serde_as]
101#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
102pub struct OpenRegion {
103 pub region_ident: RegionIdent,
104 pub region_storage_path: String,
105 pub region_options: HashMap<String, String>,
106 #[serde(default)]
107 #[serde_as(as = "HashMap<serde_with::DisplayFromStr, _>")]
108 pub region_wal_options: HashMap<RegionNumber, String>,
109 #[serde(default)]
110 pub skip_wal_replay: bool,
111}
112
113impl OpenRegion {
114 pub fn new(
115 region_ident: RegionIdent,
116 path: &str,
117 region_options: HashMap<String, String>,
118 region_wal_options: HashMap<RegionNumber, String>,
119 skip_wal_replay: bool,
120 ) -> Self {
121 Self {
122 region_ident,
123 region_storage_path: path.to_string(),
124 region_options,
125 region_wal_options,
126 skip_wal_replay,
127 }
128 }
129}
130
131#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
133pub struct DowngradeRegion {
134 pub region_id: RegionId,
136 #[serde(default)]
140 pub flush_timeout: Option<Duration>,
141}
142
143impl Display for DowngradeRegion {
144 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
145 write!(
146 f,
147 "DowngradeRegion(region_id={}, flush_timeout={:?})",
148 self.region_id, self.flush_timeout,
149 )
150 }
151}
152
153#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
155pub struct UpgradeRegion {
156 pub region_id: RegionId,
158 pub last_entry_id: Option<u64>,
160 pub metadata_last_entry_id: Option<u64>,
162 #[serde(with = "humantime_serde")]
167 pub replay_timeout: Option<Duration>,
168 #[serde(default)]
170 pub location_id: Option<u64>,
171}
172
173#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
174pub enum CacheIdent {
176 FlowId(FlowId),
177 FlowNodeAddressChange(u64),
179 FlowName(FlowName),
180 TableId(TableId),
181 TableName(TableName),
182 SchemaName(SchemaName),
183 CreateFlow(CreateFlow),
184 DropFlow(DropFlow),
185}
186
187#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
188pub struct CreateFlow {
189 pub flow_id: FlowId,
191 pub source_table_ids: Vec<TableId>,
192 pub partition_to_peer_mapping: Vec<(FlowPartitionId, Peer)>,
194}
195
196#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
197pub struct DropFlow {
198 pub flow_id: FlowId,
199 pub source_table_ids: Vec<TableId>,
200 pub flow_part2node_id: Vec<(FlowPartitionId, FlownodeId)>,
202}
203
204#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
206pub struct FlushRegions {
207 pub region_ids: Vec<RegionId>,
208}
209
210#[derive(Debug, Clone, Serialize, Deserialize, Display, PartialEq)]
211pub enum Instruction {
212 OpenRegion(OpenRegion),
216 CloseRegion(RegionIdent),
220 UpgradeRegion(UpgradeRegion),
222 DowngradeRegion(DowngradeRegion),
224 InvalidateCaches(Vec<CacheIdent>),
226 FlushRegions(FlushRegions),
228 FlushRegion(RegionId),
230}
231
232#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
234pub struct UpgradeRegionReply {
235 pub ready: bool,
237 pub exists: bool,
239 pub error: Option<String>,
241}
242
243impl Display for UpgradeRegionReply {
244 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
245 write!(
246 f,
247 "(ready={}, exists={}, error={:?})",
248 self.ready, self.exists, self.error
249 )
250 }
251}
252
253#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
254#[serde(tag = "type", rename_all = "snake_case")]
255pub enum InstructionReply {
256 OpenRegion(SimpleReply),
257 CloseRegion(SimpleReply),
258 UpgradeRegion(UpgradeRegionReply),
259 DowngradeRegion(DowngradeRegionReply),
260 FlushRegion(SimpleReply),
261}
262
263impl Display for InstructionReply {
264 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
265 match self {
266 Self::OpenRegion(reply) => write!(f, "InstructionReply::OpenRegion({})", reply),
267 Self::CloseRegion(reply) => write!(f, "InstructionReply::CloseRegion({})", reply),
268 Self::UpgradeRegion(reply) => write!(f, "InstructionReply::UpgradeRegion({})", reply),
269 Self::DowngradeRegion(reply) => {
270 write!(f, "InstructionReply::DowngradeRegion({})", reply)
271 }
272 Self::FlushRegion(reply) => write!(f, "InstructionReply::FlushRegion({})", reply),
273 }
274 }
275}
276
277#[cfg(test)]
278mod tests {
279 use super::*;
280
281 #[test]
282 fn test_serialize_instruction() {
283 let open_region = Instruction::OpenRegion(OpenRegion::new(
284 RegionIdent {
285 datanode_id: 2,
286 table_id: 1024,
287 region_number: 1,
288 engine: "mito2".to_string(),
289 },
290 "test/foo",
291 HashMap::new(),
292 HashMap::new(),
293 false,
294 ));
295
296 let serialized = serde_json::to_string(&open_region).unwrap();
297
298 assert_eq!(
299 r#"{"OpenRegion":{"region_ident":{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"},"region_storage_path":"test/foo","region_options":{},"region_wal_options":{},"skip_wal_replay":false}}"#,
300 serialized
301 );
302
303 let close_region = Instruction::CloseRegion(RegionIdent {
304 datanode_id: 2,
305 table_id: 1024,
306 region_number: 1,
307 engine: "mito2".to_string(),
308 });
309
310 let serialized = serde_json::to_string(&close_region).unwrap();
311
312 assert_eq!(
313 r#"{"CloseRegion":{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"}}"#,
314 serialized
315 );
316 }
317
318 #[derive(Debug, Clone, Serialize, Deserialize)]
319 struct LegacyOpenRegion {
320 region_ident: RegionIdent,
321 region_storage_path: String,
322 region_options: HashMap<String, String>,
323 }
324
325 #[test]
326 fn test_compatible_serialize_open_region() {
327 let region_ident = RegionIdent {
328 datanode_id: 2,
329 table_id: 1024,
330 region_number: 1,
331 engine: "mito2".to_string(),
332 };
333 let region_storage_path = "test/foo".to_string();
334 let region_options = HashMap::from([
335 ("a".to_string(), "aa".to_string()),
336 ("b".to_string(), "bb".to_string()),
337 ]);
338
339 let legacy_open_region = LegacyOpenRegion {
341 region_ident: region_ident.clone(),
342 region_storage_path: region_storage_path.clone(),
343 region_options: region_options.clone(),
344 };
345 let serialized = serde_json::to_string(&legacy_open_region).unwrap();
346
347 let deserialized = serde_json::from_str(&serialized).unwrap();
349 let expected = OpenRegion {
350 region_ident,
351 region_storage_path,
352 region_options,
353 region_wal_options: HashMap::new(),
354 skip_wal_replay: false,
355 };
356 assert_eq!(expected, deserialized);
357 }
358}