1use std::collections::HashMap;
16use std::fmt::{Display, Formatter};
17use std::time::Duration;
18
19use serde::{Deserialize, Serialize};
20use store_api::storage::{RegionId, RegionNumber};
21use strum::Display;
22use table::metadata::TableId;
23use table::table_name::TableName;
24
25use crate::flow_name::FlowName;
26use crate::key::schema_name::SchemaName;
27use crate::key::FlowId;
28use crate::peer::Peer;
29use crate::{DatanodeId, FlownodeId};
30
31#[derive(Eq, Hash, PartialEq, Clone, Debug, Serialize, Deserialize)]
32pub struct RegionIdent {
33 pub datanode_id: DatanodeId,
34 pub table_id: TableId,
35 pub region_number: RegionNumber,
36 pub engine: String,
37}
38
39impl RegionIdent {
40 pub fn get_region_id(&self) -> RegionId {
41 RegionId::new(self.table_id, self.region_number)
42 }
43}
44
45impl Display for RegionIdent {
46 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
47 write!(
48 f,
49 "RegionIdent(datanode_id='{}', table_id={}, region_number={}, engine = {})",
50 self.datanode_id, self.table_id, self.region_number, self.engine
51 )
52 }
53}
54
55#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
57pub struct DowngradeRegionReply {
58 pub last_entry_id: Option<u64>,
60 pub metadata_last_entry_id: Option<u64>,
62 pub exists: bool,
64 pub error: Option<String>,
66}
67
68impl Display for DowngradeRegionReply {
69 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
70 write!(
71 f,
72 "(last_entry_id={:?}, exists={}, error={:?})",
73 self.last_entry_id, self.exists, self.error
74 )
75 }
76}
77
78#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
79pub struct SimpleReply {
80 pub result: bool,
81 pub error: Option<String>,
82}
83
84impl Display for SimpleReply {
85 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
86 write!(f, "(result={}, error={:?})", self.result, self.error)
87 }
88}
89
90impl Display for OpenRegion {
91 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
92 write!(
93 f,
94 "OpenRegion(region_ident={}, region_storage_path={})",
95 self.region_ident, self.region_storage_path
96 )
97 }
98}
99
100#[serde_with::serde_as]
101#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
102pub struct OpenRegion {
103 pub region_ident: RegionIdent,
104 pub region_storage_path: String,
105 pub region_options: HashMap<String, String>,
106 #[serde(default)]
107 #[serde_as(as = "HashMap<serde_with::DisplayFromStr, _>")]
108 pub region_wal_options: HashMap<RegionNumber, String>,
109 #[serde(default)]
110 pub skip_wal_replay: bool,
111}
112
113impl OpenRegion {
114 pub fn new(
115 region_ident: RegionIdent,
116 path: &str,
117 region_options: HashMap<String, String>,
118 region_wal_options: HashMap<RegionNumber, String>,
119 skip_wal_replay: bool,
120 ) -> Self {
121 Self {
122 region_ident,
123 region_storage_path: path.to_string(),
124 region_options,
125 region_wal_options,
126 skip_wal_replay,
127 }
128 }
129}
130
131#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
133pub struct DowngradeRegion {
134 pub region_id: RegionId,
136 #[serde(default)]
140 pub flush_timeout: Option<Duration>,
141}
142
143impl Display for DowngradeRegion {
144 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
145 write!(
146 f,
147 "DowngradeRegion(region_id={}, flush_timeout={:?})",
148 self.region_id, self.flush_timeout,
149 )
150 }
151}
152
153#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
155pub struct UpgradeRegion {
156 pub region_id: RegionId,
158 pub last_entry_id: Option<u64>,
160 pub metadata_last_entry_id: Option<u64>,
162 #[serde(with = "humantime_serde")]
167 pub replay_timeout: Option<Duration>,
168 #[serde(default)]
170 pub location_id: Option<u64>,
171}
172
173#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
174pub enum CacheIdent {
176 FlowId(FlowId),
177 FlowName(FlowName),
178 TableId(TableId),
179 TableName(TableName),
180 SchemaName(SchemaName),
181 CreateFlow(CreateFlow),
182 DropFlow(DropFlow),
183}
184
185#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
186pub struct CreateFlow {
187 pub source_table_ids: Vec<TableId>,
188 pub flownodes: Vec<Peer>,
189}
190
191#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
192pub struct DropFlow {
193 pub source_table_ids: Vec<TableId>,
194 pub flownode_ids: Vec<FlownodeId>,
195}
196
197#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
199pub struct FlushRegions {
200 pub region_ids: Vec<RegionId>,
201}
202
203#[derive(Debug, Clone, Serialize, Deserialize, Display, PartialEq)]
204pub enum Instruction {
205 OpenRegion(OpenRegion),
209 CloseRegion(RegionIdent),
213 UpgradeRegion(UpgradeRegion),
215 DowngradeRegion(DowngradeRegion),
217 InvalidateCaches(Vec<CacheIdent>),
219 FlushRegions(FlushRegions),
221 FlushRegion(RegionId),
223}
224
225#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
227pub struct UpgradeRegionReply {
228 pub ready: bool,
230 pub exists: bool,
232 pub error: Option<String>,
234}
235
236impl Display for UpgradeRegionReply {
237 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
238 write!(
239 f,
240 "(ready={}, exists={}, error={:?})",
241 self.ready, self.exists, self.error
242 )
243 }
244}
245
246#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
247#[serde(tag = "type", rename_all = "snake_case")]
248pub enum InstructionReply {
249 OpenRegion(SimpleReply),
250 CloseRegion(SimpleReply),
251 UpgradeRegion(UpgradeRegionReply),
252 DowngradeRegion(DowngradeRegionReply),
253 FlushRegion(SimpleReply),
254}
255
256impl Display for InstructionReply {
257 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
258 match self {
259 Self::OpenRegion(reply) => write!(f, "InstructionReply::OpenRegion({})", reply),
260 Self::CloseRegion(reply) => write!(f, "InstructionReply::CloseRegion({})", reply),
261 Self::UpgradeRegion(reply) => write!(f, "InstructionReply::UpgradeRegion({})", reply),
262 Self::DowngradeRegion(reply) => {
263 write!(f, "InstructionReply::DowngradeRegion({})", reply)
264 }
265 Self::FlushRegion(reply) => write!(f, "InstructionReply::FlushRegion({})", reply),
266 }
267 }
268}
269
270#[cfg(test)]
271mod tests {
272 use super::*;
273
274 #[test]
275 fn test_serialize_instruction() {
276 let open_region = Instruction::OpenRegion(OpenRegion::new(
277 RegionIdent {
278 datanode_id: 2,
279 table_id: 1024,
280 region_number: 1,
281 engine: "mito2".to_string(),
282 },
283 "test/foo",
284 HashMap::new(),
285 HashMap::new(),
286 false,
287 ));
288
289 let serialized = serde_json::to_string(&open_region).unwrap();
290
291 assert_eq!(
292 r#"{"OpenRegion":{"region_ident":{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"},"region_storage_path":"test/foo","region_options":{},"region_wal_options":{},"skip_wal_replay":false}}"#,
293 serialized
294 );
295
296 let close_region = Instruction::CloseRegion(RegionIdent {
297 datanode_id: 2,
298 table_id: 1024,
299 region_number: 1,
300 engine: "mito2".to_string(),
301 });
302
303 let serialized = serde_json::to_string(&close_region).unwrap();
304
305 assert_eq!(
306 r#"{"CloseRegion":{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"}}"#,
307 serialized
308 );
309 }
310
311 #[derive(Debug, Clone, Serialize, Deserialize)]
312 struct LegacyOpenRegion {
313 region_ident: RegionIdent,
314 region_storage_path: String,
315 region_options: HashMap<String, String>,
316 }
317
318 #[test]
319 fn test_compatible_serialize_open_region() {
320 let region_ident = RegionIdent {
321 datanode_id: 2,
322 table_id: 1024,
323 region_number: 1,
324 engine: "mito2".to_string(),
325 };
326 let region_storage_path = "test/foo".to_string();
327 let region_options = HashMap::from([
328 ("a".to_string(), "aa".to_string()),
329 ("b".to_string(), "bb".to_string()),
330 ]);
331
332 let legacy_open_region = LegacyOpenRegion {
334 region_ident: region_ident.clone(),
335 region_storage_path: region_storage_path.clone(),
336 region_options: region_options.clone(),
337 };
338 let serialized = serde_json::to_string(&legacy_open_region).unwrap();
339
340 let deserialized = serde_json::from_str(&serialized).unwrap();
342 let expected = OpenRegion {
343 region_ident,
344 region_storage_path,
345 region_options,
346 region_wal_options: HashMap::new(),
347 skip_wal_replay: false,
348 };
349 assert_eq!(expected, deserialized);
350 }
351}