1use std::collections::HashMap;
16use std::fmt::{Display, Formatter};
17use std::time::Duration;
18
19use serde::{Deserialize, Serialize};
20use store_api::storage::{RegionId, RegionNumber};
21use strum::Display;
22use table::metadata::TableId;
23use table::table_name::TableName;
24
25use crate::flow_name::FlowName;
26use crate::key::schema_name::SchemaName;
27use crate::key::{FlowId, FlowPartitionId};
28use crate::peer::Peer;
29use crate::{DatanodeId, FlownodeId};
30
31#[derive(Eq, Hash, PartialEq, Clone, Debug, Serialize, Deserialize)]
32pub struct RegionIdent {
33 pub datanode_id: DatanodeId,
34 pub table_id: TableId,
35 pub region_number: RegionNumber,
36 pub engine: String,
37}
38
39impl RegionIdent {
40 pub fn get_region_id(&self) -> RegionId {
41 RegionId::new(self.table_id, self.region_number)
42 }
43}
44
45impl Display for RegionIdent {
46 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
47 write!(
48 f,
49 "RegionIdent(datanode_id='{}', table_id={}, region_number={}, engine = {})",
50 self.datanode_id, self.table_id, self.region_number, self.engine
51 )
52 }
53}
54
55#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
57pub struct DowngradeRegionReply {
58 pub last_entry_id: Option<u64>,
60 pub metadata_last_entry_id: Option<u64>,
62 pub exists: bool,
64 pub error: Option<String>,
66}
67
68impl Display for DowngradeRegionReply {
69 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
70 write!(
71 f,
72 "(last_entry_id={:?}, exists={}, error={:?})",
73 self.last_entry_id, self.exists, self.error
74 )
75 }
76}
77
78#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
79pub struct SimpleReply {
80 pub result: bool,
81 pub error: Option<String>,
82}
83
84#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
86pub struct FlushRegionReply {
87 pub results: Vec<(RegionId, Result<(), String>)>,
91 pub overall_success: bool,
93}
94
95impl FlushRegionReply {
96 pub fn success_single(region_id: RegionId) -> Self {
98 Self {
99 results: vec![(region_id, Ok(()))],
100 overall_success: true,
101 }
102 }
103
104 pub fn error_single(region_id: RegionId, error: String) -> Self {
106 Self {
107 results: vec![(region_id, Err(error))],
108 overall_success: false,
109 }
110 }
111
112 pub fn from_results(results: Vec<(RegionId, Result<(), String>)>) -> Self {
114 let overall_success = results.iter().all(|(_, result)| result.is_ok());
115 Self {
116 results,
117 overall_success,
118 }
119 }
120
121 pub fn to_simple_reply(&self) -> SimpleReply {
123 if self.overall_success {
124 SimpleReply {
125 result: true,
126 error: None,
127 }
128 } else {
129 let errors: Vec<String> = self
130 .results
131 .iter()
132 .filter_map(|(region_id, result)| {
133 result
134 .as_ref()
135 .err()
136 .map(|err| format!("{}: {}", region_id, err))
137 })
138 .collect();
139 SimpleReply {
140 result: false,
141 error: Some(errors.join("; ")),
142 }
143 }
144 }
145}
146
147impl Display for SimpleReply {
148 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
149 write!(f, "(result={}, error={:?})", self.result, self.error)
150 }
151}
152
153impl Display for FlushRegionReply {
154 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
155 let results_str = self
156 .results
157 .iter()
158 .map(|(region_id, result)| match result {
159 Ok(()) => format!("{}:OK", region_id),
160 Err(err) => format!("{}:ERR({})", region_id, err),
161 })
162 .collect::<Vec<_>>()
163 .join(", ");
164 write!(
165 f,
166 "(overall_success={}, results=[{}])",
167 self.overall_success, results_str
168 )
169 }
170}
171
172impl Display for OpenRegion {
173 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
174 write!(
175 f,
176 "OpenRegion(region_ident={}, region_storage_path={})",
177 self.region_ident, self.region_storage_path
178 )
179 }
180}
181
182#[serde_with::serde_as]
183#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
184pub struct OpenRegion {
185 pub region_ident: RegionIdent,
186 pub region_storage_path: String,
187 pub region_options: HashMap<String, String>,
188 #[serde(default)]
189 #[serde_as(as = "HashMap<serde_with::DisplayFromStr, _>")]
190 pub region_wal_options: HashMap<RegionNumber, String>,
191 #[serde(default)]
192 pub skip_wal_replay: bool,
193}
194
195impl OpenRegion {
196 pub fn new(
197 region_ident: RegionIdent,
198 path: &str,
199 region_options: HashMap<String, String>,
200 region_wal_options: HashMap<RegionNumber, String>,
201 skip_wal_replay: bool,
202 ) -> Self {
203 Self {
204 region_ident,
205 region_storage_path: path.to_string(),
206 region_options,
207 region_wal_options,
208 skip_wal_replay,
209 }
210 }
211}
212
213#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
215pub struct DowngradeRegion {
216 pub region_id: RegionId,
218 #[serde(default)]
222 pub flush_timeout: Option<Duration>,
223}
224
225impl Display for DowngradeRegion {
226 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
227 write!(
228 f,
229 "DowngradeRegion(region_id={}, flush_timeout={:?})",
230 self.region_id, self.flush_timeout,
231 )
232 }
233}
234
235#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
237pub struct UpgradeRegion {
238 pub region_id: RegionId,
240 pub last_entry_id: Option<u64>,
242 pub metadata_last_entry_id: Option<u64>,
244 #[serde(with = "humantime_serde")]
249 pub replay_timeout: Option<Duration>,
250 #[serde(default)]
252 pub location_id: Option<u64>,
253 #[serde(default, skip_serializing_if = "Option::is_none")]
254 pub replay_entry_id: Option<u64>,
255 #[serde(default, skip_serializing_if = "Option::is_none")]
256 pub metadata_replay_entry_id: Option<u64>,
257}
258
259impl UpgradeRegion {
260 pub fn with_replay_entry_id(mut self, replay_entry_id: Option<u64>) -> Self {
262 self.replay_entry_id = replay_entry_id;
263 self
264 }
265
266 pub fn with_metadata_replay_entry_id(mut self, metadata_replay_entry_id: Option<u64>) -> Self {
268 self.metadata_replay_entry_id = metadata_replay_entry_id;
269 self
270 }
271}
272
273#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
274pub enum CacheIdent {
276 FlowId(FlowId),
277 FlowNodeAddressChange(u64),
279 FlowName(FlowName),
280 TableId(TableId),
281 TableName(TableName),
282 SchemaName(SchemaName),
283 CreateFlow(CreateFlow),
284 DropFlow(DropFlow),
285}
286
287#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
288pub struct CreateFlow {
289 pub flow_id: FlowId,
291 pub source_table_ids: Vec<TableId>,
292 pub partition_to_peer_mapping: Vec<(FlowPartitionId, Peer)>,
294}
295
296#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
297pub struct DropFlow {
298 pub flow_id: FlowId,
299 pub source_table_ids: Vec<TableId>,
300 pub flow_part2node_id: Vec<(FlowPartitionId, FlownodeId)>,
302}
303
304#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
306pub enum FlushStrategy {
307 #[default]
309 Sync,
310 Async,
312}
313
314#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
316pub enum FlushErrorStrategy {
317 #[default]
319 FailFast,
320 TryAll,
322}
323
324#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
327pub struct FlushRegions {
328 pub region_ids: Vec<RegionId>,
330 #[serde(default)]
332 pub strategy: FlushStrategy,
333 #[serde(default)]
335 pub error_strategy: FlushErrorStrategy,
336}
337
338impl FlushRegions {
339 pub fn sync_single(region_id: RegionId) -> Self {
341 Self {
342 region_ids: vec![region_id],
343 strategy: FlushStrategy::Sync,
344 error_strategy: FlushErrorStrategy::FailFast,
345 }
346 }
347
348 pub fn async_batch(region_ids: Vec<RegionId>) -> Self {
350 Self {
351 region_ids,
352 strategy: FlushStrategy::Async,
353 error_strategy: FlushErrorStrategy::TryAll,
354 }
355 }
356
357 pub fn sync_batch(region_ids: Vec<RegionId>, error_strategy: FlushErrorStrategy) -> Self {
359 Self {
360 region_ids,
361 strategy: FlushStrategy::Sync,
362 error_strategy,
363 }
364 }
365
366 pub fn is_single_region(&self) -> bool {
368 self.region_ids.len() == 1
369 }
370
371 pub fn single_region_id(&self) -> Option<RegionId> {
373 if self.is_single_region() {
374 self.region_ids.first().copied()
375 } else {
376 None
377 }
378 }
379
380 pub fn is_hint(&self) -> bool {
382 matches!(self.strategy, FlushStrategy::Async)
383 }
384
385 pub fn is_sync(&self) -> bool {
387 matches!(self.strategy, FlushStrategy::Sync)
388 }
389}
390
391impl From<RegionId> for FlushRegions {
392 fn from(region_id: RegionId) -> Self {
393 Self::sync_single(region_id)
394 }
395}
396
397#[derive(Debug, Clone, Serialize, Deserialize, Display, PartialEq)]
398pub enum Instruction {
399 OpenRegion(OpenRegion),
403 CloseRegion(RegionIdent),
407 UpgradeRegion(UpgradeRegion),
409 DowngradeRegion(DowngradeRegion),
411 InvalidateCaches(Vec<CacheIdent>),
413 FlushRegions(FlushRegions),
415}
416
417#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
419pub struct UpgradeRegionReply {
420 pub ready: bool,
422 pub exists: bool,
424 pub error: Option<String>,
426}
427
428impl Display for UpgradeRegionReply {
429 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
430 write!(
431 f,
432 "(ready={}, exists={}, error={:?})",
433 self.ready, self.exists, self.error
434 )
435 }
436}
437
438#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
439#[serde(tag = "type", rename_all = "snake_case")]
440pub enum InstructionReply {
441 OpenRegion(SimpleReply),
442 CloseRegion(SimpleReply),
443 UpgradeRegion(UpgradeRegionReply),
444 DowngradeRegion(DowngradeRegionReply),
445 FlushRegions(FlushRegionReply),
446}
447
448impl Display for InstructionReply {
449 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
450 match self {
451 Self::OpenRegion(reply) => write!(f, "InstructionReply::OpenRegion({})", reply),
452 Self::CloseRegion(reply) => write!(f, "InstructionReply::CloseRegion({})", reply),
453 Self::UpgradeRegion(reply) => write!(f, "InstructionReply::UpgradeRegion({})", reply),
454 Self::DowngradeRegion(reply) => {
455 write!(f, "InstructionReply::DowngradeRegion({})", reply)
456 }
457 Self::FlushRegions(reply) => write!(f, "InstructionReply::FlushRegions({})", reply),
458 }
459 }
460}
461
462#[cfg(test)]
463mod tests {
464 use super::*;
465
466 #[test]
467 fn test_serialize_instruction() {
468 let open_region = Instruction::OpenRegion(OpenRegion::new(
469 RegionIdent {
470 datanode_id: 2,
471 table_id: 1024,
472 region_number: 1,
473 engine: "mito2".to_string(),
474 },
475 "test/foo",
476 HashMap::new(),
477 HashMap::new(),
478 false,
479 ));
480
481 let serialized = serde_json::to_string(&open_region).unwrap();
482
483 assert_eq!(
484 r#"{"OpenRegion":{"region_ident":{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"},"region_storage_path":"test/foo","region_options":{},"region_wal_options":{},"skip_wal_replay":false}}"#,
485 serialized
486 );
487
488 let close_region = Instruction::CloseRegion(RegionIdent {
489 datanode_id: 2,
490 table_id: 1024,
491 region_number: 1,
492 engine: "mito2".to_string(),
493 });
494
495 let serialized = serde_json::to_string(&close_region).unwrap();
496
497 assert_eq!(
498 r#"{"CloseRegion":{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"}}"#,
499 serialized
500 );
501 }
502
503 #[derive(Debug, Clone, Serialize, Deserialize)]
504 struct LegacyOpenRegion {
505 region_ident: RegionIdent,
506 region_storage_path: String,
507 region_options: HashMap<String, String>,
508 }
509
510 #[test]
511 fn test_compatible_serialize_open_region() {
512 let region_ident = RegionIdent {
513 datanode_id: 2,
514 table_id: 1024,
515 region_number: 1,
516 engine: "mito2".to_string(),
517 };
518 let region_storage_path = "test/foo".to_string();
519 let region_options = HashMap::from([
520 ("a".to_string(), "aa".to_string()),
521 ("b".to_string(), "bb".to_string()),
522 ]);
523
524 let legacy_open_region = LegacyOpenRegion {
526 region_ident: region_ident.clone(),
527 region_storage_path: region_storage_path.clone(),
528 region_options: region_options.clone(),
529 };
530 let serialized = serde_json::to_string(&legacy_open_region).unwrap();
531
532 let deserialized = serde_json::from_str(&serialized).unwrap();
534 let expected = OpenRegion {
535 region_ident,
536 region_storage_path,
537 region_options,
538 region_wal_options: HashMap::new(),
539 skip_wal_replay: false,
540 };
541 assert_eq!(expected, deserialized);
542 }
543
544 #[test]
545 fn test_flush_regions_creation() {
546 let region_id = RegionId::new(1024, 1);
547
548 let single_sync = FlushRegions::sync_single(region_id);
550 assert_eq!(single_sync.region_ids, vec![region_id]);
551 assert_eq!(single_sync.strategy, FlushStrategy::Sync);
552 assert!(!single_sync.is_hint());
553 assert!(single_sync.is_sync());
554 assert_eq!(single_sync.error_strategy, FlushErrorStrategy::FailFast);
555 assert!(single_sync.is_single_region());
556 assert_eq!(single_sync.single_region_id(), Some(region_id));
557
558 let region_ids = vec![RegionId::new(1024, 1), RegionId::new(1024, 2)];
560 let batch_async = FlushRegions::async_batch(region_ids.clone());
561 assert_eq!(batch_async.region_ids, region_ids);
562 assert_eq!(batch_async.strategy, FlushStrategy::Async);
563 assert!(batch_async.is_hint());
564 assert!(!batch_async.is_sync());
565 assert_eq!(batch_async.error_strategy, FlushErrorStrategy::TryAll);
566 assert!(!batch_async.is_single_region());
567 assert_eq!(batch_async.single_region_id(), None);
568
569 let batch_sync = FlushRegions::sync_batch(region_ids.clone(), FlushErrorStrategy::FailFast);
571 assert_eq!(batch_sync.region_ids, region_ids);
572 assert_eq!(batch_sync.strategy, FlushStrategy::Sync);
573 assert!(!batch_sync.is_hint());
574 assert!(batch_sync.is_sync());
575 assert_eq!(batch_sync.error_strategy, FlushErrorStrategy::FailFast);
576 }
577
578 #[test]
579 fn test_flush_regions_conversion() {
580 let region_id = RegionId::new(1024, 1);
581
582 let from_region_id: FlushRegions = region_id.into();
583 assert_eq!(from_region_id.region_ids, vec![region_id]);
584 assert_eq!(from_region_id.strategy, FlushStrategy::Sync);
585 assert!(!from_region_id.is_hint());
586 assert!(from_region_id.is_sync());
587
588 let flush_regions = FlushRegions {
590 region_ids: vec![region_id],
591 strategy: FlushStrategy::Async,
592 error_strategy: FlushErrorStrategy::TryAll,
593 };
594 assert_eq!(flush_regions.region_ids, vec![region_id]);
595 assert_eq!(flush_regions.strategy, FlushStrategy::Async);
596 assert!(flush_regions.is_hint());
597 assert!(!flush_regions.is_sync());
598 }
599
600 #[test]
601 fn test_flush_region_reply() {
602 let region_id = RegionId::new(1024, 1);
603
604 let success_reply = FlushRegionReply::success_single(region_id);
606 assert!(success_reply.overall_success);
607 assert_eq!(success_reply.results.len(), 1);
608 assert_eq!(success_reply.results[0].0, region_id);
609 assert!(success_reply.results[0].1.is_ok());
610
611 let error_reply = FlushRegionReply::error_single(region_id, "test error".to_string());
613 assert!(!error_reply.overall_success);
614 assert_eq!(error_reply.results.len(), 1);
615 assert_eq!(error_reply.results[0].0, region_id);
616 assert!(error_reply.results[0].1.is_err());
617
618 let region_id2 = RegionId::new(1024, 2);
620 let results = vec![
621 (region_id, Ok(())),
622 (region_id2, Err("flush failed".to_string())),
623 ];
624 let batch_reply = FlushRegionReply::from_results(results);
625 assert!(!batch_reply.overall_success);
626 assert_eq!(batch_reply.results.len(), 2);
627
628 let simple_reply = batch_reply.to_simple_reply();
630 assert!(!simple_reply.result);
631 assert!(simple_reply.error.is_some());
632 assert!(simple_reply.error.unwrap().contains("flush failed"));
633 }
634
635 #[test]
636 fn test_serialize_flush_regions_instruction() {
637 let region_id = RegionId::new(1024, 1);
638 let flush_regions = FlushRegions::sync_single(region_id);
639 let instruction = Instruction::FlushRegions(flush_regions.clone());
640
641 let serialized = serde_json::to_string(&instruction).unwrap();
642 let deserialized: Instruction = serde_json::from_str(&serialized).unwrap();
643
644 match deserialized {
645 Instruction::FlushRegions(fr) => {
646 assert_eq!(fr.region_ids, vec![region_id]);
647 assert_eq!(fr.strategy, FlushStrategy::Sync);
648 assert_eq!(fr.error_strategy, FlushErrorStrategy::FailFast);
649 }
650 _ => panic!("Expected FlushRegions instruction"),
651 }
652 }
653
654 #[test]
655 fn test_serialize_flush_regions_batch_instruction() {
656 let region_ids = vec![RegionId::new(1024, 1), RegionId::new(1024, 2)];
657 let flush_regions =
658 FlushRegions::sync_batch(region_ids.clone(), FlushErrorStrategy::TryAll);
659 let instruction = Instruction::FlushRegions(flush_regions);
660
661 let serialized = serde_json::to_string(&instruction).unwrap();
662 let deserialized: Instruction = serde_json::from_str(&serialized).unwrap();
663
664 match deserialized {
665 Instruction::FlushRegions(fr) => {
666 assert_eq!(fr.region_ids, region_ids);
667 assert_eq!(fr.strategy, FlushStrategy::Sync);
668 assert!(!fr.is_hint());
669 assert!(fr.is_sync());
670 assert_eq!(fr.error_strategy, FlushErrorStrategy::TryAll);
671 }
672 _ => panic!("Expected FlushRegions instruction"),
673 }
674 }
675}