1use std::collections::HashMap;
16use std::fmt::{Display, Formatter};
17use std::time::Duration;
18
19use serde::{Deserialize, Serialize};
20use store_api::storage::{RegionId, RegionNumber};
21use strum::Display;
22use table::metadata::TableId;
23use table::table_name::TableName;
24
25use crate::flow_name::FlowName;
26use crate::key::schema_name::SchemaName;
27use crate::key::{FlowId, FlowPartitionId};
28use crate::peer::Peer;
29use crate::{DatanodeId, FlownodeId};
30
31#[derive(Eq, Hash, PartialEq, Clone, Debug, Serialize, Deserialize)]
32pub struct RegionIdent {
33 pub datanode_id: DatanodeId,
34 pub table_id: TableId,
35 pub region_number: RegionNumber,
36 pub engine: String,
37}
38
39impl RegionIdent {
40 pub fn get_region_id(&self) -> RegionId {
41 RegionId::new(self.table_id, self.region_number)
42 }
43}
44
45impl Display for RegionIdent {
46 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
47 write!(
48 f,
49 "RegionIdent(datanode_id='{}', table_id={}, region_number={}, engine = {})",
50 self.datanode_id, self.table_id, self.region_number, self.engine
51 )
52 }
53}
54
55#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
57pub struct DowngradeRegionReply {
58 pub last_entry_id: Option<u64>,
60 pub metadata_last_entry_id: Option<u64>,
62 pub exists: bool,
64 pub error: Option<String>,
66}
67
68impl Display for DowngradeRegionReply {
69 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
70 write!(
71 f,
72 "(last_entry_id={:?}, exists={}, error={:?})",
73 self.last_entry_id, self.exists, self.error
74 )
75 }
76}
77
78#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
79pub struct SimpleReply {
80 pub result: bool,
81 pub error: Option<String>,
82}
83
84#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
86pub struct FlushRegionReply {
87 pub results: Vec<(RegionId, Result<(), String>)>,
91 pub overall_success: bool,
93}
94
95impl FlushRegionReply {
96 pub fn success_single(region_id: RegionId) -> Self {
98 Self {
99 results: vec![(region_id, Ok(()))],
100 overall_success: true,
101 }
102 }
103
104 pub fn error_single(region_id: RegionId, error: String) -> Self {
106 Self {
107 results: vec![(region_id, Err(error))],
108 overall_success: false,
109 }
110 }
111
112 pub fn from_results(results: Vec<(RegionId, Result<(), String>)>) -> Self {
114 let overall_success = results.iter().all(|(_, result)| result.is_ok());
115 Self {
116 results,
117 overall_success,
118 }
119 }
120
121 pub fn to_simple_reply(&self) -> SimpleReply {
123 if self.overall_success {
124 SimpleReply {
125 result: true,
126 error: None,
127 }
128 } else {
129 let errors: Vec<String> = self
130 .results
131 .iter()
132 .filter_map(|(region_id, result)| {
133 result
134 .as_ref()
135 .err()
136 .map(|err| format!("{}: {}", region_id, err))
137 })
138 .collect();
139 SimpleReply {
140 result: false,
141 error: Some(errors.join("; ")),
142 }
143 }
144 }
145}
146
147impl Display for SimpleReply {
148 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
149 write!(f, "(result={}, error={:?})", self.result, self.error)
150 }
151}
152
153impl Display for FlushRegionReply {
154 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
155 let results_str = self
156 .results
157 .iter()
158 .map(|(region_id, result)| match result {
159 Ok(()) => format!("{}:OK", region_id),
160 Err(err) => format!("{}:ERR({})", region_id, err),
161 })
162 .collect::<Vec<_>>()
163 .join(", ");
164 write!(
165 f,
166 "(overall_success={}, results=[{}])",
167 self.overall_success, results_str
168 )
169 }
170}
171
172impl Display for OpenRegion {
173 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
174 write!(
175 f,
176 "OpenRegion(region_ident={}, region_storage_path={})",
177 self.region_ident, self.region_storage_path
178 )
179 }
180}
181
182#[serde_with::serde_as]
183#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
184pub struct OpenRegion {
185 pub region_ident: RegionIdent,
186 pub region_storage_path: String,
187 pub region_options: HashMap<String, String>,
188 #[serde(default)]
189 #[serde_as(as = "HashMap<serde_with::DisplayFromStr, _>")]
190 pub region_wal_options: HashMap<RegionNumber, String>,
191 #[serde(default)]
192 pub skip_wal_replay: bool,
193}
194
195impl OpenRegion {
196 pub fn new(
197 region_ident: RegionIdent,
198 path: &str,
199 region_options: HashMap<String, String>,
200 region_wal_options: HashMap<RegionNumber, String>,
201 skip_wal_replay: bool,
202 ) -> Self {
203 Self {
204 region_ident,
205 region_storage_path: path.to_string(),
206 region_options,
207 region_wal_options,
208 skip_wal_replay,
209 }
210 }
211}
212
213#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
215pub struct DowngradeRegion {
216 pub region_id: RegionId,
218 #[serde(default)]
222 pub flush_timeout: Option<Duration>,
223}
224
225impl Display for DowngradeRegion {
226 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
227 write!(
228 f,
229 "DowngradeRegion(region_id={}, flush_timeout={:?})",
230 self.region_id, self.flush_timeout,
231 )
232 }
233}
234
235#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
237pub struct UpgradeRegion {
238 pub region_id: RegionId,
240 pub last_entry_id: Option<u64>,
242 pub metadata_last_entry_id: Option<u64>,
244 #[serde(with = "humantime_serde")]
249 pub replay_timeout: Option<Duration>,
250 #[serde(default)]
252 pub location_id: Option<u64>,
253 #[serde(default, skip_serializing_if = "Option::is_none")]
254 pub replay_entry_id: Option<u64>,
255 #[serde(default, skip_serializing_if = "Option::is_none")]
256 pub metadata_replay_entry_id: Option<u64>,
257}
258
259impl UpgradeRegion {
260 pub fn with_replay_entry_id(mut self, replay_entry_id: Option<u64>) -> Self {
262 self.replay_entry_id = replay_entry_id;
263 self
264 }
265
266 pub fn with_metadata_replay_entry_id(mut self, metadata_replay_entry_id: Option<u64>) -> Self {
268 self.metadata_replay_entry_id = metadata_replay_entry_id;
269 self
270 }
271}
272
273#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
274pub enum CacheIdent {
276 FlowId(FlowId),
277 FlowNodeAddressChange(u64),
279 FlowName(FlowName),
280 TableId(TableId),
281 TableName(TableName),
282 SchemaName(SchemaName),
283 CreateFlow(CreateFlow),
284 DropFlow(DropFlow),
285}
286
287#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
288pub struct CreateFlow {
289 pub flow_id: FlowId,
291 pub source_table_ids: Vec<TableId>,
292 pub partition_to_peer_mapping: Vec<(FlowPartitionId, Peer)>,
294}
295
296#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
297pub struct DropFlow {
298 pub flow_id: FlowId,
299 pub source_table_ids: Vec<TableId>,
300 pub flow_part2node_id: Vec<(FlowPartitionId, FlownodeId)>,
302}
303
304#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
306pub enum FlushStrategy {
307 Sync,
309 Async,
311}
312
313#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
315pub enum FlushErrorStrategy {
316 FailFast,
318 TryAll,
320}
321
322impl Default for FlushStrategy {
323 fn default() -> Self {
324 Self::Sync
325 }
326}
327
328impl Default for FlushErrorStrategy {
329 fn default() -> Self {
330 Self::FailFast
331 }
332}
333
334#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
337pub struct FlushRegions {
338 pub region_ids: Vec<RegionId>,
340 #[serde(default)]
342 pub strategy: FlushStrategy,
343 #[serde(default)]
345 pub error_strategy: FlushErrorStrategy,
346}
347
348impl FlushRegions {
349 pub fn sync_single(region_id: RegionId) -> Self {
351 Self {
352 region_ids: vec![region_id],
353 strategy: FlushStrategy::Sync,
354 error_strategy: FlushErrorStrategy::FailFast,
355 }
356 }
357
358 pub fn async_batch(region_ids: Vec<RegionId>) -> Self {
360 Self {
361 region_ids,
362 strategy: FlushStrategy::Async,
363 error_strategy: FlushErrorStrategy::TryAll,
364 }
365 }
366
367 pub fn sync_batch(region_ids: Vec<RegionId>, error_strategy: FlushErrorStrategy) -> Self {
369 Self {
370 region_ids,
371 strategy: FlushStrategy::Sync,
372 error_strategy,
373 }
374 }
375
376 pub fn is_single_region(&self) -> bool {
378 self.region_ids.len() == 1
379 }
380
381 pub fn single_region_id(&self) -> Option<RegionId> {
383 if self.is_single_region() {
384 self.region_ids.first().copied()
385 } else {
386 None
387 }
388 }
389
390 pub fn is_hint(&self) -> bool {
392 matches!(self.strategy, FlushStrategy::Async)
393 }
394
395 pub fn is_sync(&self) -> bool {
397 matches!(self.strategy, FlushStrategy::Sync)
398 }
399}
400
401impl From<RegionId> for FlushRegions {
402 fn from(region_id: RegionId) -> Self {
403 Self::sync_single(region_id)
404 }
405}
406
407#[derive(Debug, Clone, Serialize, Deserialize, Display, PartialEq)]
408pub enum Instruction {
409 OpenRegion(OpenRegion),
413 CloseRegion(RegionIdent),
417 UpgradeRegion(UpgradeRegion),
419 DowngradeRegion(DowngradeRegion),
421 InvalidateCaches(Vec<CacheIdent>),
423 FlushRegions(FlushRegions),
425}
426
427#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
429pub struct UpgradeRegionReply {
430 pub ready: bool,
432 pub exists: bool,
434 pub error: Option<String>,
436}
437
438impl Display for UpgradeRegionReply {
439 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
440 write!(
441 f,
442 "(ready={}, exists={}, error={:?})",
443 self.ready, self.exists, self.error
444 )
445 }
446}
447
448#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
449#[serde(tag = "type", rename_all = "snake_case")]
450pub enum InstructionReply {
451 OpenRegion(SimpleReply),
452 CloseRegion(SimpleReply),
453 UpgradeRegion(UpgradeRegionReply),
454 DowngradeRegion(DowngradeRegionReply),
455 FlushRegions(FlushRegionReply),
456}
457
458impl Display for InstructionReply {
459 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
460 match self {
461 Self::OpenRegion(reply) => write!(f, "InstructionReply::OpenRegion({})", reply),
462 Self::CloseRegion(reply) => write!(f, "InstructionReply::CloseRegion({})", reply),
463 Self::UpgradeRegion(reply) => write!(f, "InstructionReply::UpgradeRegion({})", reply),
464 Self::DowngradeRegion(reply) => {
465 write!(f, "InstructionReply::DowngradeRegion({})", reply)
466 }
467 Self::FlushRegions(reply) => write!(f, "InstructionReply::FlushRegions({})", reply),
468 }
469 }
470}
471
472#[cfg(test)]
473mod tests {
474 use super::*;
475
476 #[test]
477 fn test_serialize_instruction() {
478 let open_region = Instruction::OpenRegion(OpenRegion::new(
479 RegionIdent {
480 datanode_id: 2,
481 table_id: 1024,
482 region_number: 1,
483 engine: "mito2".to_string(),
484 },
485 "test/foo",
486 HashMap::new(),
487 HashMap::new(),
488 false,
489 ));
490
491 let serialized = serde_json::to_string(&open_region).unwrap();
492
493 assert_eq!(
494 r#"{"OpenRegion":{"region_ident":{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"},"region_storage_path":"test/foo","region_options":{},"region_wal_options":{},"skip_wal_replay":false}}"#,
495 serialized
496 );
497
498 let close_region = Instruction::CloseRegion(RegionIdent {
499 datanode_id: 2,
500 table_id: 1024,
501 region_number: 1,
502 engine: "mito2".to_string(),
503 });
504
505 let serialized = serde_json::to_string(&close_region).unwrap();
506
507 assert_eq!(
508 r#"{"CloseRegion":{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"}}"#,
509 serialized
510 );
511 }
512
513 #[derive(Debug, Clone, Serialize, Deserialize)]
514 struct LegacyOpenRegion {
515 region_ident: RegionIdent,
516 region_storage_path: String,
517 region_options: HashMap<String, String>,
518 }
519
520 #[test]
521 fn test_compatible_serialize_open_region() {
522 let region_ident = RegionIdent {
523 datanode_id: 2,
524 table_id: 1024,
525 region_number: 1,
526 engine: "mito2".to_string(),
527 };
528 let region_storage_path = "test/foo".to_string();
529 let region_options = HashMap::from([
530 ("a".to_string(), "aa".to_string()),
531 ("b".to_string(), "bb".to_string()),
532 ]);
533
534 let legacy_open_region = LegacyOpenRegion {
536 region_ident: region_ident.clone(),
537 region_storage_path: region_storage_path.clone(),
538 region_options: region_options.clone(),
539 };
540 let serialized = serde_json::to_string(&legacy_open_region).unwrap();
541
542 let deserialized = serde_json::from_str(&serialized).unwrap();
544 let expected = OpenRegion {
545 region_ident,
546 region_storage_path,
547 region_options,
548 region_wal_options: HashMap::new(),
549 skip_wal_replay: false,
550 };
551 assert_eq!(expected, deserialized);
552 }
553
554 #[test]
555 fn test_flush_regions_creation() {
556 let region_id = RegionId::new(1024, 1);
557
558 let single_sync = FlushRegions::sync_single(region_id);
560 assert_eq!(single_sync.region_ids, vec![region_id]);
561 assert_eq!(single_sync.strategy, FlushStrategy::Sync);
562 assert!(!single_sync.is_hint());
563 assert!(single_sync.is_sync());
564 assert_eq!(single_sync.error_strategy, FlushErrorStrategy::FailFast);
565 assert!(single_sync.is_single_region());
566 assert_eq!(single_sync.single_region_id(), Some(region_id));
567
568 let region_ids = vec![RegionId::new(1024, 1), RegionId::new(1024, 2)];
570 let batch_async = FlushRegions::async_batch(region_ids.clone());
571 assert_eq!(batch_async.region_ids, region_ids);
572 assert_eq!(batch_async.strategy, FlushStrategy::Async);
573 assert!(batch_async.is_hint());
574 assert!(!batch_async.is_sync());
575 assert_eq!(batch_async.error_strategy, FlushErrorStrategy::TryAll);
576 assert!(!batch_async.is_single_region());
577 assert_eq!(batch_async.single_region_id(), None);
578
579 let batch_sync = FlushRegions::sync_batch(region_ids.clone(), FlushErrorStrategy::FailFast);
581 assert_eq!(batch_sync.region_ids, region_ids);
582 assert_eq!(batch_sync.strategy, FlushStrategy::Sync);
583 assert!(!batch_sync.is_hint());
584 assert!(batch_sync.is_sync());
585 assert_eq!(batch_sync.error_strategy, FlushErrorStrategy::FailFast);
586 }
587
588 #[test]
589 fn test_flush_regions_conversion() {
590 let region_id = RegionId::new(1024, 1);
591
592 let from_region_id: FlushRegions = region_id.into();
593 assert_eq!(from_region_id.region_ids, vec![region_id]);
594 assert_eq!(from_region_id.strategy, FlushStrategy::Sync);
595 assert!(!from_region_id.is_hint());
596 assert!(from_region_id.is_sync());
597
598 let flush_regions = FlushRegions {
600 region_ids: vec![region_id],
601 strategy: FlushStrategy::Async,
602 error_strategy: FlushErrorStrategy::TryAll,
603 };
604 assert_eq!(flush_regions.region_ids, vec![region_id]);
605 assert_eq!(flush_regions.strategy, FlushStrategy::Async);
606 assert!(flush_regions.is_hint());
607 assert!(!flush_regions.is_sync());
608 }
609
610 #[test]
611 fn test_flush_region_reply() {
612 let region_id = RegionId::new(1024, 1);
613
614 let success_reply = FlushRegionReply::success_single(region_id);
616 assert!(success_reply.overall_success);
617 assert_eq!(success_reply.results.len(), 1);
618 assert_eq!(success_reply.results[0].0, region_id);
619 assert!(success_reply.results[0].1.is_ok());
620
621 let error_reply = FlushRegionReply::error_single(region_id, "test error".to_string());
623 assert!(!error_reply.overall_success);
624 assert_eq!(error_reply.results.len(), 1);
625 assert_eq!(error_reply.results[0].0, region_id);
626 assert!(error_reply.results[0].1.is_err());
627
628 let region_id2 = RegionId::new(1024, 2);
630 let results = vec![
631 (region_id, Ok(())),
632 (region_id2, Err("flush failed".to_string())),
633 ];
634 let batch_reply = FlushRegionReply::from_results(results);
635 assert!(!batch_reply.overall_success);
636 assert_eq!(batch_reply.results.len(), 2);
637
638 let simple_reply = batch_reply.to_simple_reply();
640 assert!(!simple_reply.result);
641 assert!(simple_reply.error.is_some());
642 assert!(simple_reply.error.unwrap().contains("flush failed"));
643 }
644
645 #[test]
646 fn test_serialize_flush_regions_instruction() {
647 let region_id = RegionId::new(1024, 1);
648 let flush_regions = FlushRegions::sync_single(region_id);
649 let instruction = Instruction::FlushRegions(flush_regions.clone());
650
651 let serialized = serde_json::to_string(&instruction).unwrap();
652 let deserialized: Instruction = serde_json::from_str(&serialized).unwrap();
653
654 match deserialized {
655 Instruction::FlushRegions(fr) => {
656 assert_eq!(fr.region_ids, vec![region_id]);
657 assert_eq!(fr.strategy, FlushStrategy::Sync);
658 assert_eq!(fr.error_strategy, FlushErrorStrategy::FailFast);
659 }
660 _ => panic!("Expected FlushRegions instruction"),
661 }
662 }
663
664 #[test]
665 fn test_serialize_flush_regions_batch_instruction() {
666 let region_ids = vec![RegionId::new(1024, 1), RegionId::new(1024, 2)];
667 let flush_regions =
668 FlushRegions::sync_batch(region_ids.clone(), FlushErrorStrategy::TryAll);
669 let instruction = Instruction::FlushRegions(flush_regions);
670
671 let serialized = serde_json::to_string(&instruction).unwrap();
672 let deserialized: Instruction = serde_json::from_str(&serialized).unwrap();
673
674 match deserialized {
675 Instruction::FlushRegions(fr) => {
676 assert_eq!(fr.region_ids, region_ids);
677 assert_eq!(fr.strategy, FlushStrategy::Sync);
678 assert!(!fr.is_hint());
679 assert!(fr.is_sync());
680 assert_eq!(fr.error_strategy, FlushErrorStrategy::TryAll);
681 }
682 _ => panic!("Expected FlushRegions instruction"),
683 }
684 }
685}