1use std::collections::HashMap;
16use std::fmt::{Display, Formatter};
17use std::time::Duration;
18
19use serde::{Deserialize, Serialize};
20use store_api::storage::{RegionId, RegionNumber};
21use strum::Display;
22use table::metadata::TableId;
23use table::table_name::TableName;
24
25use crate::flow_name::FlowName;
26use crate::key::schema_name::SchemaName;
27use crate::key::{FlowId, FlowPartitionId};
28use crate::peer::Peer;
29use crate::{DatanodeId, FlownodeId};
30
31#[derive(Eq, Hash, PartialEq, Clone, Debug, Serialize, Deserialize)]
32pub struct RegionIdent {
33 pub datanode_id: DatanodeId,
34 pub table_id: TableId,
35 pub region_number: RegionNumber,
36 pub engine: String,
37}
38
39impl RegionIdent {
40 pub fn get_region_id(&self) -> RegionId {
41 RegionId::new(self.table_id, self.region_number)
42 }
43}
44
45impl Display for RegionIdent {
46 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
47 write!(
48 f,
49 "RegionIdent(datanode_id='{}', table_id={}, region_number={}, engine = {})",
50 self.datanode_id, self.table_id, self.region_number, self.engine
51 )
52 }
53}
54
55#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
57pub struct DowngradeRegionReply {
58 pub last_entry_id: Option<u64>,
60 pub metadata_last_entry_id: Option<u64>,
62 pub exists: bool,
64 pub error: Option<String>,
66}
67
68impl Display for DowngradeRegionReply {
69 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
70 write!(
71 f,
72 "(last_entry_id={:?}, exists={}, error={:?})",
73 self.last_entry_id, self.exists, self.error
74 )
75 }
76}
77
78#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
79pub struct SimpleReply {
80 pub result: bool,
81 pub error: Option<String>,
82}
83
84impl Display for SimpleReply {
85 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
86 write!(f, "(result={}, error={:?})", self.result, self.error)
87 }
88}
89
90impl Display for OpenRegion {
91 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
92 write!(
93 f,
94 "OpenRegion(region_ident={}, region_storage_path={})",
95 self.region_ident, self.region_storage_path
96 )
97 }
98}
99
100#[serde_with::serde_as]
101#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
102pub struct OpenRegion {
103 pub region_ident: RegionIdent,
104 pub region_storage_path: String,
105 pub region_options: HashMap<String, String>,
106 #[serde(default)]
107 #[serde_as(as = "HashMap<serde_with::DisplayFromStr, _>")]
108 pub region_wal_options: HashMap<RegionNumber, String>,
109 #[serde(default)]
110 pub skip_wal_replay: bool,
111}
112
113impl OpenRegion {
114 pub fn new(
115 region_ident: RegionIdent,
116 path: &str,
117 region_options: HashMap<String, String>,
118 region_wal_options: HashMap<RegionNumber, String>,
119 skip_wal_replay: bool,
120 ) -> Self {
121 Self {
122 region_ident,
123 region_storage_path: path.to_string(),
124 region_options,
125 region_wal_options,
126 skip_wal_replay,
127 }
128 }
129}
130
131#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
133pub struct DowngradeRegion {
134 pub region_id: RegionId,
136 #[serde(default)]
140 pub flush_timeout: Option<Duration>,
141}
142
143impl Display for DowngradeRegion {
144 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
145 write!(
146 f,
147 "DowngradeRegion(region_id={}, flush_timeout={:?})",
148 self.region_id, self.flush_timeout,
149 )
150 }
151}
152
153#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
155pub struct UpgradeRegion {
156 pub region_id: RegionId,
158 pub last_entry_id: Option<u64>,
160 pub metadata_last_entry_id: Option<u64>,
162 #[serde(with = "humantime_serde")]
167 pub replay_timeout: Option<Duration>,
168 #[serde(default)]
170 pub location_id: Option<u64>,
171}
172
173#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
174pub enum CacheIdent {
176 FlowId(FlowId),
177 FlowName(FlowName),
178 TableId(TableId),
179 TableName(TableName),
180 SchemaName(SchemaName),
181 CreateFlow(CreateFlow),
182 DropFlow(DropFlow),
183}
184
185#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
186pub struct CreateFlow {
187 pub flow_id: FlowId,
189 pub source_table_ids: Vec<TableId>,
190 pub partition_to_peer_mapping: Vec<(FlowPartitionId, Peer)>,
192}
193
194#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
195pub struct DropFlow {
196 pub flow_id: FlowId,
197 pub source_table_ids: Vec<TableId>,
198 pub flow_part2node_id: Vec<(FlowPartitionId, FlownodeId)>,
200}
201
202#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
204pub struct FlushRegions {
205 pub region_ids: Vec<RegionId>,
206}
207
208#[derive(Debug, Clone, Serialize, Deserialize, Display, PartialEq)]
209pub enum Instruction {
210 OpenRegion(OpenRegion),
214 CloseRegion(RegionIdent),
218 UpgradeRegion(UpgradeRegion),
220 DowngradeRegion(DowngradeRegion),
222 InvalidateCaches(Vec<CacheIdent>),
224 FlushRegions(FlushRegions),
226 FlushRegion(RegionId),
228}
229
230#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
232pub struct UpgradeRegionReply {
233 pub ready: bool,
235 pub exists: bool,
237 pub error: Option<String>,
239}
240
241impl Display for UpgradeRegionReply {
242 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
243 write!(
244 f,
245 "(ready={}, exists={}, error={:?})",
246 self.ready, self.exists, self.error
247 )
248 }
249}
250
251#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
252#[serde(tag = "type", rename_all = "snake_case")]
253pub enum InstructionReply {
254 OpenRegion(SimpleReply),
255 CloseRegion(SimpleReply),
256 UpgradeRegion(UpgradeRegionReply),
257 DowngradeRegion(DowngradeRegionReply),
258 FlushRegion(SimpleReply),
259}
260
261impl Display for InstructionReply {
262 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
263 match self {
264 Self::OpenRegion(reply) => write!(f, "InstructionReply::OpenRegion({})", reply),
265 Self::CloseRegion(reply) => write!(f, "InstructionReply::CloseRegion({})", reply),
266 Self::UpgradeRegion(reply) => write!(f, "InstructionReply::UpgradeRegion({})", reply),
267 Self::DowngradeRegion(reply) => {
268 write!(f, "InstructionReply::DowngradeRegion({})", reply)
269 }
270 Self::FlushRegion(reply) => write!(f, "InstructionReply::FlushRegion({})", reply),
271 }
272 }
273}
274
275#[cfg(test)]
276mod tests {
277 use super::*;
278
279 #[test]
280 fn test_serialize_instruction() {
281 let open_region = Instruction::OpenRegion(OpenRegion::new(
282 RegionIdent {
283 datanode_id: 2,
284 table_id: 1024,
285 region_number: 1,
286 engine: "mito2".to_string(),
287 },
288 "test/foo",
289 HashMap::new(),
290 HashMap::new(),
291 false,
292 ));
293
294 let serialized = serde_json::to_string(&open_region).unwrap();
295
296 assert_eq!(
297 r#"{"OpenRegion":{"region_ident":{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"},"region_storage_path":"test/foo","region_options":{},"region_wal_options":{},"skip_wal_replay":false}}"#,
298 serialized
299 );
300
301 let close_region = Instruction::CloseRegion(RegionIdent {
302 datanode_id: 2,
303 table_id: 1024,
304 region_number: 1,
305 engine: "mito2".to_string(),
306 });
307
308 let serialized = serde_json::to_string(&close_region).unwrap();
309
310 assert_eq!(
311 r#"{"CloseRegion":{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"}}"#,
312 serialized
313 );
314 }
315
316 #[derive(Debug, Clone, Serialize, Deserialize)]
317 struct LegacyOpenRegion {
318 region_ident: RegionIdent,
319 region_storage_path: String,
320 region_options: HashMap<String, String>,
321 }
322
323 #[test]
324 fn test_compatible_serialize_open_region() {
325 let region_ident = RegionIdent {
326 datanode_id: 2,
327 table_id: 1024,
328 region_number: 1,
329 engine: "mito2".to_string(),
330 };
331 let region_storage_path = "test/foo".to_string();
332 let region_options = HashMap::from([
333 ("a".to_string(), "aa".to_string()),
334 ("b".to_string(), "bb".to_string()),
335 ]);
336
337 let legacy_open_region = LegacyOpenRegion {
339 region_ident: region_ident.clone(),
340 region_storage_path: region_storage_path.clone(),
341 region_options: region_options.clone(),
342 };
343 let serialized = serde_json::to_string(&legacy_open_region).unwrap();
344
345 let deserialized = serde_json::from_str(&serialized).unwrap();
347 let expected = OpenRegion {
348 region_ident,
349 region_storage_path,
350 region_options,
351 region_wal_options: HashMap::new(),
352 skip_wal_replay: false,
353 };
354 assert_eq!(expected, deserialized);
355 }
356}