1use std::collections::HashMap;
16use std::fmt::{Display, Formatter};
17use std::time::Duration;
18
19use serde::{Deserialize, Deserializer, Serialize};
20use store_api::storage::{FileRefsManifest, GcReport, RegionId, RegionNumber};
21use strum::Display;
22use table::metadata::TableId;
23use table::table_name::TableName;
24
25use crate::flow_name::FlowName;
26use crate::key::schema_name::SchemaName;
27use crate::key::{FlowId, FlowPartitionId};
28use crate::peer::Peer;
29use crate::{DatanodeId, FlownodeId};
30
31#[derive(Eq, Hash, PartialEq, Clone, Debug, Serialize, Deserialize)]
32pub struct RegionIdent {
33 pub datanode_id: DatanodeId,
34 pub table_id: TableId,
35 pub region_number: RegionNumber,
36 pub engine: String,
37}
38
39impl RegionIdent {
40 pub fn get_region_id(&self) -> RegionId {
41 RegionId::new(self.table_id, self.region_number)
42 }
43}
44
45impl Display for RegionIdent {
46 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
47 write!(
48 f,
49 "RegionIdent(datanode_id='{}', table_id={}, region_number={}, engine = {})",
50 self.datanode_id, self.table_id, self.region_number, self.engine
51 )
52 }
53}
54
55#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
57pub struct DowngradeRegionReply {
58 #[serde(default)]
61 pub region_id: RegionId,
62 pub last_entry_id: Option<u64>,
64 pub metadata_last_entry_id: Option<u64>,
66 pub exists: bool,
68 pub error: Option<String>,
70}
71
72impl Display for DowngradeRegionReply {
73 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
74 write!(
75 f,
76 "(last_entry_id={:?}, exists={}, error={:?})",
77 self.last_entry_id, self.exists, self.error
78 )
79 }
80}
81
82#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
83pub struct SimpleReply {
84 pub result: bool,
85 pub error: Option<String>,
86}
87
88#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
90pub struct FlushRegionReply {
91 pub results: Vec<(RegionId, Result<(), String>)>,
95 pub overall_success: bool,
97}
98
99impl FlushRegionReply {
100 pub fn success_single(region_id: RegionId) -> Self {
102 Self {
103 results: vec![(region_id, Ok(()))],
104 overall_success: true,
105 }
106 }
107
108 pub fn error_single(region_id: RegionId, error: String) -> Self {
110 Self {
111 results: vec![(region_id, Err(error))],
112 overall_success: false,
113 }
114 }
115
116 pub fn from_results(results: Vec<(RegionId, Result<(), String>)>) -> Self {
118 let overall_success = results.iter().all(|(_, result)| result.is_ok());
119 Self {
120 results,
121 overall_success,
122 }
123 }
124
125 pub fn to_simple_reply(&self) -> SimpleReply {
127 if self.overall_success {
128 SimpleReply {
129 result: true,
130 error: None,
131 }
132 } else {
133 let errors: Vec<String> = self
134 .results
135 .iter()
136 .filter_map(|(region_id, result)| {
137 result
138 .as_ref()
139 .err()
140 .map(|err| format!("{}: {}", region_id, err))
141 })
142 .collect();
143 SimpleReply {
144 result: false,
145 error: Some(errors.join("; ")),
146 }
147 }
148 }
149}
150
151impl Display for SimpleReply {
152 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
153 write!(f, "(result={}, error={:?})", self.result, self.error)
154 }
155}
156
157impl Display for FlushRegionReply {
158 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
159 let results_str = self
160 .results
161 .iter()
162 .map(|(region_id, result)| match result {
163 Ok(()) => format!("{}:OK", region_id),
164 Err(err) => format!("{}:ERR({})", region_id, err),
165 })
166 .collect::<Vec<_>>()
167 .join(", ");
168 write!(
169 f,
170 "(overall_success={}, results=[{}])",
171 self.overall_success, results_str
172 )
173 }
174}
175
176impl Display for OpenRegion {
177 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
178 write!(
179 f,
180 "OpenRegion(region_ident={}, region_storage_path={})",
181 self.region_ident, self.region_storage_path
182 )
183 }
184}
185
186#[serde_with::serde_as]
187#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
188pub struct OpenRegion {
189 pub region_ident: RegionIdent,
190 pub region_storage_path: String,
191 pub region_options: HashMap<String, String>,
192 #[serde(default)]
193 #[serde_as(as = "HashMap<serde_with::DisplayFromStr, _>")]
194 pub region_wal_options: HashMap<RegionNumber, String>,
195 #[serde(default)]
196 pub skip_wal_replay: bool,
197}
198
199impl OpenRegion {
200 pub fn new(
201 region_ident: RegionIdent,
202 path: &str,
203 region_options: HashMap<String, String>,
204 region_wal_options: HashMap<RegionNumber, String>,
205 skip_wal_replay: bool,
206 ) -> Self {
207 Self {
208 region_ident,
209 region_storage_path: path.to_string(),
210 region_options,
211 region_wal_options,
212 skip_wal_replay,
213 }
214 }
215}
216
217#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
219pub struct DowngradeRegion {
220 pub region_id: RegionId,
222 #[serde(default)]
226 pub flush_timeout: Option<Duration>,
227}
228
229impl Display for DowngradeRegion {
230 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
231 write!(
232 f,
233 "DowngradeRegion(region_id={}, flush_timeout={:?})",
234 self.region_id, self.flush_timeout,
235 )
236 }
237}
238
239#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
241pub struct UpgradeRegion {
242 pub region_id: RegionId,
244 pub last_entry_id: Option<u64>,
246 pub metadata_last_entry_id: Option<u64>,
248 #[serde(with = "humantime_serde")]
253 pub replay_timeout: Duration,
254 #[serde(default)]
256 pub location_id: Option<u64>,
257 #[serde(default, skip_serializing_if = "Option::is_none")]
258 pub replay_entry_id: Option<u64>,
259 #[serde(default, skip_serializing_if = "Option::is_none")]
260 pub metadata_replay_entry_id: Option<u64>,
261}
262
263impl UpgradeRegion {
264 pub fn with_replay_entry_id(mut self, replay_entry_id: Option<u64>) -> Self {
266 self.replay_entry_id = replay_entry_id;
267 self
268 }
269
270 pub fn with_metadata_replay_entry_id(mut self, metadata_replay_entry_id: Option<u64>) -> Self {
272 self.metadata_replay_entry_id = metadata_replay_entry_id;
273 self
274 }
275}
276
277#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
278pub enum CacheIdent {
280 FlowId(FlowId),
281 FlowNodeAddressChange(u64),
283 FlowName(FlowName),
284 TableId(TableId),
285 TableName(TableName),
286 SchemaName(SchemaName),
287 CreateFlow(CreateFlow),
288 DropFlow(DropFlow),
289}
290
291#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
292pub struct CreateFlow {
293 pub flow_id: FlowId,
295 pub source_table_ids: Vec<TableId>,
296 pub partition_to_peer_mapping: Vec<(FlowPartitionId, Peer)>,
298}
299
300#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
301pub struct DropFlow {
302 pub flow_id: FlowId,
303 pub source_table_ids: Vec<TableId>,
304 pub flow_part2node_id: Vec<(FlowPartitionId, FlownodeId)>,
306}
307
308#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
310pub enum FlushStrategy {
311 #[default]
313 Sync,
314 Async,
316}
317
318#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
320pub enum FlushErrorStrategy {
321 #[default]
323 FailFast,
324 TryAll,
326}
327
328#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
331pub struct FlushRegions {
332 pub region_ids: Vec<RegionId>,
334 #[serde(default)]
336 pub strategy: FlushStrategy,
337 #[serde(default)]
339 pub error_strategy: FlushErrorStrategy,
340}
341
342impl FlushRegions {
343 pub fn sync_single(region_id: RegionId) -> Self {
345 Self {
346 region_ids: vec![region_id],
347 strategy: FlushStrategy::Sync,
348 error_strategy: FlushErrorStrategy::FailFast,
349 }
350 }
351
352 pub fn async_batch(region_ids: Vec<RegionId>) -> Self {
354 Self {
355 region_ids,
356 strategy: FlushStrategy::Async,
357 error_strategy: FlushErrorStrategy::TryAll,
358 }
359 }
360
361 pub fn sync_batch(region_ids: Vec<RegionId>, error_strategy: FlushErrorStrategy) -> Self {
363 Self {
364 region_ids,
365 strategy: FlushStrategy::Sync,
366 error_strategy,
367 }
368 }
369
370 pub fn is_single_region(&self) -> bool {
372 self.region_ids.len() == 1
373 }
374
375 pub fn single_region_id(&self) -> Option<RegionId> {
377 if self.is_single_region() {
378 self.region_ids.first().copied()
379 } else {
380 None
381 }
382 }
383
384 pub fn is_hint(&self) -> bool {
386 matches!(self.strategy, FlushStrategy::Async)
387 }
388
389 pub fn is_sync(&self) -> bool {
391 matches!(self.strategy, FlushStrategy::Sync)
392 }
393}
394
395impl From<RegionId> for FlushRegions {
396 fn from(region_id: RegionId) -> Self {
397 Self::sync_single(region_id)
398 }
399}
400
401#[derive(Debug, Deserialize)]
402#[serde(untagged)]
403enum SingleOrMultiple<T> {
404 Single(T),
405 Multiple(Vec<T>),
406}
407
408fn single_or_multiple_from<'de, D, T>(deserializer: D) -> Result<Vec<T>, D::Error>
409where
410 D: Deserializer<'de>,
411 T: Deserialize<'de>,
412{
413 let helper = SingleOrMultiple::<T>::deserialize(deserializer)?;
414 Ok(match helper {
415 SingleOrMultiple::Single(x) => vec![x],
416 SingleOrMultiple::Multiple(xs) => xs,
417 })
418}
419
420#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
422pub struct GetFileRefs {
423 pub region_ids: Vec<RegionId>,
425}
426
427impl Display for GetFileRefs {
428 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
429 write!(f, "GetFileRefs(region_ids={:?})", self.region_ids)
430 }
431}
432
433#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
435pub struct GcRegions {
436 pub regions: Vec<RegionId>,
438 pub file_refs_manifest: FileRefsManifest,
440 pub full_file_listing: bool,
442}
443
444impl Display for GcRegions {
445 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
446 write!(
447 f,
448 "GcRegion(regions={:?}, file_refs_count={}, full_file_listing={})",
449 self.regions,
450 self.file_refs_manifest.file_refs.len(),
451 self.full_file_listing
452 )
453 }
454}
455
456#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
458pub struct GetFileRefsReply {
459 pub file_refs_manifest: FileRefsManifest,
461 pub success: bool,
463 pub error: Option<String>,
465}
466
467impl Display for GetFileRefsReply {
468 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
469 write!(
470 f,
471 "GetFileRefsReply(success={}, file_refs_count={}, error={:?})",
472 self.success,
473 self.file_refs_manifest.file_refs.len(),
474 self.error
475 )
476 }
477}
478
479#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
481pub struct GcRegionsReply {
482 pub result: Result<GcReport, String>,
483}
484
485impl Display for GcRegionsReply {
486 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
487 write!(
488 f,
489 "GcReply(result={})",
490 match &self.result {
491 Ok(report) => format!(
492 "GcReport(deleted_files_count={}, need_retry_regions_count={})",
493 report.deleted_files.len(),
494 report.need_retry_regions.len()
495 ),
496 Err(err) => format!("Err({})", err),
497 }
498 )
499 }
500}
501
502#[derive(Debug, Clone, Serialize, Deserialize, Display, PartialEq)]
503pub enum Instruction {
504 #[serde(deserialize_with = "single_or_multiple_from", alias = "OpenRegion")]
506 OpenRegions(Vec<OpenRegion>),
507 #[serde(deserialize_with = "single_or_multiple_from", alias = "CloseRegion")]
509 CloseRegions(Vec<RegionIdent>),
510 #[serde(deserialize_with = "single_or_multiple_from", alias = "UpgradeRegion")]
512 UpgradeRegions(Vec<UpgradeRegion>),
513 #[serde(
514 deserialize_with = "single_or_multiple_from",
515 alias = "DowngradeRegion"
516 )]
517 DowngradeRegions(Vec<DowngradeRegion>),
519 InvalidateCaches(Vec<CacheIdent>),
521 FlushRegions(FlushRegions),
523 GetFileRefs(GetFileRefs),
525 GcRegions(GcRegions),
527}
528
529impl Instruction {
530 pub fn into_open_regions(self) -> Option<Vec<OpenRegion>> {
532 match self {
533 Self::OpenRegions(open_regions) => Some(open_regions),
534 _ => None,
535 }
536 }
537
538 pub fn into_close_regions(self) -> Option<Vec<RegionIdent>> {
540 match self {
541 Self::CloseRegions(close_regions) => Some(close_regions),
542 _ => None,
543 }
544 }
545
546 pub fn into_flush_regions(self) -> Option<FlushRegions> {
548 match self {
549 Self::FlushRegions(flush_regions) => Some(flush_regions),
550 _ => None,
551 }
552 }
553
554 pub fn into_downgrade_regions(self) -> Option<Vec<DowngradeRegion>> {
556 match self {
557 Self::DowngradeRegions(downgrade_region) => Some(downgrade_region),
558 _ => None,
559 }
560 }
561
562 pub fn into_upgrade_regions(self) -> Option<Vec<UpgradeRegion>> {
564 match self {
565 Self::UpgradeRegions(upgrade_region) => Some(upgrade_region),
566 _ => None,
567 }
568 }
569
570 pub fn into_get_file_refs(self) -> Option<GetFileRefs> {
571 match self {
572 Self::GetFileRefs(get_file_refs) => Some(get_file_refs),
573 _ => None,
574 }
575 }
576
577 pub fn into_gc_regions(self) -> Option<GcRegions> {
578 match self {
579 Self::GcRegions(gc_regions) => Some(gc_regions),
580 _ => None,
581 }
582 }
583}
584
585#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
587pub struct UpgradeRegionReply {
588 #[serde(default)]
591 pub region_id: RegionId,
592 pub ready: bool,
594 pub exists: bool,
596 pub error: Option<String>,
598}
599
600impl Display for UpgradeRegionReply {
601 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
602 write!(
603 f,
604 "(ready={}, exists={}, error={:?})",
605 self.ready, self.exists, self.error
606 )
607 }
608}
609
610#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
611pub struct DowngradeRegionsReply {
612 pub replies: Vec<DowngradeRegionReply>,
613}
614
615impl DowngradeRegionsReply {
616 pub fn new(replies: Vec<DowngradeRegionReply>) -> Self {
617 Self { replies }
618 }
619
620 pub fn single(reply: DowngradeRegionReply) -> Self {
621 Self::new(vec![reply])
622 }
623}
624
625#[derive(Deserialize)]
626#[serde(untagged)]
627enum DowngradeRegionsCompat {
628 Single(DowngradeRegionReply),
629 Multiple(DowngradeRegionsReply),
630}
631
632fn downgrade_regions_compat_from<'de, D>(deserializer: D) -> Result<DowngradeRegionsReply, D::Error>
633where
634 D: Deserializer<'de>,
635{
636 let helper = DowngradeRegionsCompat::deserialize(deserializer)?;
637 Ok(match helper {
638 DowngradeRegionsCompat::Single(x) => DowngradeRegionsReply::new(vec![x]),
639 DowngradeRegionsCompat::Multiple(reply) => reply,
640 })
641}
642
643#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
644pub struct UpgradeRegionsReply {
645 pub replies: Vec<UpgradeRegionReply>,
646}
647
648impl UpgradeRegionsReply {
649 pub fn new(replies: Vec<UpgradeRegionReply>) -> Self {
650 Self { replies }
651 }
652
653 pub fn single(reply: UpgradeRegionReply) -> Self {
654 Self::new(vec![reply])
655 }
656}
657
658#[derive(Deserialize)]
659#[serde(untagged)]
660enum UpgradeRegionsCompat {
661 Single(UpgradeRegionReply),
662 Multiple(UpgradeRegionsReply),
663}
664
665fn upgrade_regions_compat_from<'de, D>(deserializer: D) -> Result<UpgradeRegionsReply, D::Error>
666where
667 D: Deserializer<'de>,
668{
669 let helper = UpgradeRegionsCompat::deserialize(deserializer)?;
670 Ok(match helper {
671 UpgradeRegionsCompat::Single(x) => UpgradeRegionsReply::new(vec![x]),
672 UpgradeRegionsCompat::Multiple(reply) => reply,
673 })
674}
675
676#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
677#[serde(tag = "type", rename_all = "snake_case")]
678pub enum InstructionReply {
679 #[serde(alias = "open_region")]
680 OpenRegions(SimpleReply),
681 #[serde(alias = "close_region")]
682 CloseRegions(SimpleReply),
683 #[serde(
684 deserialize_with = "upgrade_regions_compat_from",
685 alias = "upgrade_region"
686 )]
687 UpgradeRegions(UpgradeRegionsReply),
688 #[serde(
689 alias = "downgrade_region",
690 deserialize_with = "downgrade_regions_compat_from"
691 )]
692 DowngradeRegions(DowngradeRegionsReply),
693 FlushRegions(FlushRegionReply),
694 GetFileRefs(GetFileRefsReply),
695 GcRegions(GcRegionsReply),
696}
697
698impl Display for InstructionReply {
699 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
700 match self {
701 Self::OpenRegions(reply) => write!(f, "InstructionReply::OpenRegions({})", reply),
702 Self::CloseRegions(reply) => write!(f, "InstructionReply::CloseRegions({})", reply),
703 Self::UpgradeRegions(reply) => {
704 write!(f, "InstructionReply::UpgradeRegions({:?})", reply.replies)
705 }
706 Self::DowngradeRegions(reply) => {
707 write!(f, "InstructionReply::DowngradeRegions({:?})", reply.replies)
708 }
709 Self::FlushRegions(reply) => write!(f, "InstructionReply::FlushRegions({})", reply),
710 Self::GetFileRefs(reply) => write!(f, "InstructionReply::GetFileRefs({})", reply),
711 Self::GcRegions(reply) => write!(f, "InstructionReply::GcRegion({})", reply),
712 }
713 }
714}
715
716#[cfg(any(test, feature = "testing"))]
717impl InstructionReply {
718 pub fn expect_close_regions_reply(self) -> SimpleReply {
719 match self {
720 Self::CloseRegions(reply) => reply,
721 _ => panic!("Expected CloseRegions reply"),
722 }
723 }
724
725 pub fn expect_open_regions_reply(self) -> SimpleReply {
726 match self {
727 Self::OpenRegions(reply) => reply,
728 _ => panic!("Expected OpenRegions reply"),
729 }
730 }
731
732 pub fn expect_upgrade_regions_reply(self) -> Vec<UpgradeRegionReply> {
733 match self {
734 Self::UpgradeRegions(reply) => reply.replies,
735 _ => panic!("Expected UpgradeRegion reply"),
736 }
737 }
738
739 pub fn expect_downgrade_regions_reply(self) -> Vec<DowngradeRegionReply> {
740 match self {
741 Self::DowngradeRegions(reply) => reply.replies,
742 _ => panic!("Expected DowngradeRegion reply"),
743 }
744 }
745
746 pub fn expect_flush_regions_reply(self) -> FlushRegionReply {
747 match self {
748 Self::FlushRegions(reply) => reply,
749 _ => panic!("Expected FlushRegions reply"),
750 }
751 }
752}
753
754#[cfg(test)]
755mod tests {
756 use std::collections::HashSet;
757
758 use store_api::storage::FileId;
759
760 use super::*;
761
762 #[test]
763 fn test_serialize_instruction() {
764 let open_region = Instruction::OpenRegions(vec![OpenRegion::new(
765 RegionIdent {
766 datanode_id: 2,
767 table_id: 1024,
768 region_number: 1,
769 engine: "mito2".to_string(),
770 },
771 "test/foo",
772 HashMap::new(),
773 HashMap::new(),
774 false,
775 )]);
776
777 let serialized = serde_json::to_string(&open_region).unwrap();
778 assert_eq!(
779 r#"{"OpenRegions":[{"region_ident":{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"},"region_storage_path":"test/foo","region_options":{},"region_wal_options":{},"skip_wal_replay":false}]}"#,
780 serialized
781 );
782
783 let close_region = Instruction::CloseRegions(vec![RegionIdent {
784 datanode_id: 2,
785 table_id: 1024,
786 region_number: 1,
787 engine: "mito2".to_string(),
788 }]);
789
790 let serialized = serde_json::to_string(&close_region).unwrap();
791 assert_eq!(
792 r#"{"CloseRegions":[{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"}]}"#,
793 serialized
794 );
795
796 let upgrade_region = Instruction::UpgradeRegions(vec![UpgradeRegion {
797 region_id: RegionId::new(1024, 1),
798 last_entry_id: None,
799 metadata_last_entry_id: None,
800 replay_timeout: Duration::from_millis(1000),
801 location_id: None,
802 replay_entry_id: None,
803 metadata_replay_entry_id: None,
804 }]);
805
806 let serialized = serde_json::to_string(&upgrade_region).unwrap();
807 assert_eq!(
808 r#"{"UpgradeRegions":[{"region_id":4398046511105,"last_entry_id":null,"metadata_last_entry_id":null,"replay_timeout":"1s","location_id":null}]}"#,
809 serialized
810 );
811 }
812
813 #[test]
814 fn test_serialize_instruction_reply() {
815 let downgrade_region_reply = InstructionReply::DowngradeRegions(
816 DowngradeRegionsReply::single(DowngradeRegionReply {
817 region_id: RegionId::new(1024, 1),
818 last_entry_id: None,
819 metadata_last_entry_id: None,
820 exists: true,
821 error: None,
822 }),
823 );
824
825 let serialized = serde_json::to_string(&downgrade_region_reply).unwrap();
826 assert_eq!(
827 r#"{"type":"downgrade_regions","replies":[{"region_id":4398046511105,"last_entry_id":null,"metadata_last_entry_id":null,"exists":true,"error":null}]}"#,
828 serialized
829 );
830
831 let upgrade_region_reply =
832 InstructionReply::UpgradeRegions(UpgradeRegionsReply::single(UpgradeRegionReply {
833 region_id: RegionId::new(1024, 1),
834 ready: true,
835 exists: true,
836 error: None,
837 }));
838 let serialized = serde_json::to_string(&upgrade_region_reply).unwrap();
839 assert_eq!(
840 r#"{"type":"upgrade_regions","replies":[{"region_id":4398046511105,"ready":true,"exists":true,"error":null}]}"#,
841 serialized
842 );
843 }
844
845 #[test]
846 fn test_deserialize_instruction() {
847 let open_region_instruction = r#"{"OpenRegion":{"region_ident":{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"},"region_storage_path":"test/foo","region_options":{},"region_wal_options":{},"skip_wal_replay":false}}"#;
849 let open_region_instruction: Instruction =
850 serde_json::from_str(open_region_instruction).unwrap();
851 let open_region = Instruction::OpenRegions(vec![OpenRegion::new(
852 RegionIdent {
853 datanode_id: 2,
854 table_id: 1024,
855 region_number: 1,
856 engine: "mito2".to_string(),
857 },
858 "test/foo",
859 HashMap::new(),
860 HashMap::new(),
861 false,
862 )]);
863 assert_eq!(open_region_instruction, open_region);
864
865 let close_region_instruction = r#"{"CloseRegion":{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"}}"#;
867 let close_region_instruction: Instruction =
868 serde_json::from_str(close_region_instruction).unwrap();
869 let close_region = Instruction::CloseRegions(vec![RegionIdent {
870 datanode_id: 2,
871 table_id: 1024,
872 region_number: 1,
873 engine: "mito2".to_string(),
874 }]);
875 assert_eq!(close_region_instruction, close_region);
876
877 let downgrade_region_instruction = r#"{"DowngradeRegions":{"region_id":4398046511105,"flush_timeout":{"secs":1,"nanos":0}}}"#;
879 let downgrade_region_instruction: Instruction =
880 serde_json::from_str(downgrade_region_instruction).unwrap();
881 let downgrade_region = Instruction::DowngradeRegions(vec![DowngradeRegion {
882 region_id: RegionId::new(1024, 1),
883 flush_timeout: Some(Duration::from_millis(1000)),
884 }]);
885 assert_eq!(downgrade_region_instruction, downgrade_region);
886
887 let upgrade_region_instruction = r#"{"UpgradeRegion":{"region_id":4398046511105,"last_entry_id":null,"metadata_last_entry_id":null,"replay_timeout":"1s","location_id":null,"replay_entry_id":null,"metadata_replay_entry_id":null}}"#;
889 let upgrade_region_instruction: Instruction =
890 serde_json::from_str(upgrade_region_instruction).unwrap();
891 let upgrade_region = Instruction::UpgradeRegions(vec![UpgradeRegion {
892 region_id: RegionId::new(1024, 1),
893 last_entry_id: None,
894 metadata_last_entry_id: None,
895 replay_timeout: Duration::from_millis(1000),
896 location_id: None,
897 replay_entry_id: None,
898 metadata_replay_entry_id: None,
899 }]);
900 assert_eq!(upgrade_region_instruction, upgrade_region);
901 }
902
903 #[test]
904 fn test_deserialize_instruction_reply() {
905 let close_region_instruction_reply =
907 r#"{"result":true,"error":null,"type":"close_region"}"#;
908 let close_region_instruction_reply: InstructionReply =
909 serde_json::from_str(close_region_instruction_reply).unwrap();
910 let close_region_reply = InstructionReply::CloseRegions(SimpleReply {
911 result: true,
912 error: None,
913 });
914 assert_eq!(close_region_instruction_reply, close_region_reply);
915
916 let open_region_instruction_reply = r#"{"result":true,"error":null,"type":"open_region"}"#;
918 let open_region_instruction_reply: InstructionReply =
919 serde_json::from_str(open_region_instruction_reply).unwrap();
920 let open_region_reply = InstructionReply::OpenRegions(SimpleReply {
921 result: true,
922 error: None,
923 });
924 assert_eq!(open_region_instruction_reply, open_region_reply);
925
926 let downgrade_region_instruction_reply = r#"{"region_id":4398046511105,"last_entry_id":null,"metadata_last_entry_id":null,"exists":true,"error":null,"type":"downgrade_region"}"#;
928 let downgrade_region_instruction_reply: InstructionReply =
929 serde_json::from_str(downgrade_region_instruction_reply).unwrap();
930 let downgrade_region_reply = InstructionReply::DowngradeRegions(
931 DowngradeRegionsReply::single(DowngradeRegionReply {
932 region_id: RegionId::new(1024, 1),
933 last_entry_id: None,
934 metadata_last_entry_id: None,
935 exists: true,
936 error: None,
937 }),
938 );
939 assert_eq!(downgrade_region_instruction_reply, downgrade_region_reply);
940
941 let upgrade_region_instruction_reply = r#"{"region_id":4398046511105,"ready":true,"exists":true,"error":null,"type":"upgrade_region"}"#;
943 let upgrade_region_instruction_reply: InstructionReply =
944 serde_json::from_str(upgrade_region_instruction_reply).unwrap();
945 let upgrade_region_reply =
946 InstructionReply::UpgradeRegions(UpgradeRegionsReply::single(UpgradeRegionReply {
947 region_id: RegionId::new(1024, 1),
948 ready: true,
949 exists: true,
950 error: None,
951 }));
952 assert_eq!(upgrade_region_instruction_reply, upgrade_region_reply);
953 }
954
955 #[derive(Debug, Clone, Serialize, Deserialize)]
956 struct LegacyOpenRegion {
957 region_ident: RegionIdent,
958 region_storage_path: String,
959 region_options: HashMap<String, String>,
960 }
961
962 #[test]
963 fn test_compatible_serialize_open_region() {
964 let region_ident = RegionIdent {
965 datanode_id: 2,
966 table_id: 1024,
967 region_number: 1,
968 engine: "mito2".to_string(),
969 };
970 let region_storage_path = "test/foo".to_string();
971 let region_options = HashMap::from([
972 ("a".to_string(), "aa".to_string()),
973 ("b".to_string(), "bb".to_string()),
974 ]);
975
976 let legacy_open_region = LegacyOpenRegion {
978 region_ident: region_ident.clone(),
979 region_storage_path: region_storage_path.clone(),
980 region_options: region_options.clone(),
981 };
982 let serialized = serde_json::to_string(&legacy_open_region).unwrap();
983
984 let deserialized = serde_json::from_str(&serialized).unwrap();
986 let expected = OpenRegion {
987 region_ident,
988 region_storage_path,
989 region_options,
990 region_wal_options: HashMap::new(),
991 skip_wal_replay: false,
992 };
993 assert_eq!(expected, deserialized);
994 }
995
996 #[test]
997 fn test_flush_regions_creation() {
998 let region_id = RegionId::new(1024, 1);
999
1000 let single_sync = FlushRegions::sync_single(region_id);
1002 assert_eq!(single_sync.region_ids, vec![region_id]);
1003 assert_eq!(single_sync.strategy, FlushStrategy::Sync);
1004 assert!(!single_sync.is_hint());
1005 assert!(single_sync.is_sync());
1006 assert_eq!(single_sync.error_strategy, FlushErrorStrategy::FailFast);
1007 assert!(single_sync.is_single_region());
1008 assert_eq!(single_sync.single_region_id(), Some(region_id));
1009
1010 let region_ids = vec![RegionId::new(1024, 1), RegionId::new(1024, 2)];
1012 let batch_async = FlushRegions::async_batch(region_ids.clone());
1013 assert_eq!(batch_async.region_ids, region_ids);
1014 assert_eq!(batch_async.strategy, FlushStrategy::Async);
1015 assert!(batch_async.is_hint());
1016 assert!(!batch_async.is_sync());
1017 assert_eq!(batch_async.error_strategy, FlushErrorStrategy::TryAll);
1018 assert!(!batch_async.is_single_region());
1019 assert_eq!(batch_async.single_region_id(), None);
1020
1021 let batch_sync = FlushRegions::sync_batch(region_ids.clone(), FlushErrorStrategy::FailFast);
1023 assert_eq!(batch_sync.region_ids, region_ids);
1024 assert_eq!(batch_sync.strategy, FlushStrategy::Sync);
1025 assert!(!batch_sync.is_hint());
1026 assert!(batch_sync.is_sync());
1027 assert_eq!(batch_sync.error_strategy, FlushErrorStrategy::FailFast);
1028 }
1029
1030 #[test]
1031 fn test_flush_regions_conversion() {
1032 let region_id = RegionId::new(1024, 1);
1033
1034 let from_region_id: FlushRegions = region_id.into();
1035 assert_eq!(from_region_id.region_ids, vec![region_id]);
1036 assert_eq!(from_region_id.strategy, FlushStrategy::Sync);
1037 assert!(!from_region_id.is_hint());
1038 assert!(from_region_id.is_sync());
1039
1040 let flush_regions = FlushRegions {
1042 region_ids: vec![region_id],
1043 strategy: FlushStrategy::Async,
1044 error_strategy: FlushErrorStrategy::TryAll,
1045 };
1046 assert_eq!(flush_regions.region_ids, vec![region_id]);
1047 assert_eq!(flush_regions.strategy, FlushStrategy::Async);
1048 assert!(flush_regions.is_hint());
1049 assert!(!flush_regions.is_sync());
1050 }
1051
1052 #[test]
1053 fn test_flush_region_reply() {
1054 let region_id = RegionId::new(1024, 1);
1055
1056 let success_reply = FlushRegionReply::success_single(region_id);
1058 assert!(success_reply.overall_success);
1059 assert_eq!(success_reply.results.len(), 1);
1060 assert_eq!(success_reply.results[0].0, region_id);
1061 assert!(success_reply.results[0].1.is_ok());
1062
1063 let error_reply = FlushRegionReply::error_single(region_id, "test error".to_string());
1065 assert!(!error_reply.overall_success);
1066 assert_eq!(error_reply.results.len(), 1);
1067 assert_eq!(error_reply.results[0].0, region_id);
1068 assert!(error_reply.results[0].1.is_err());
1069
1070 let region_id2 = RegionId::new(1024, 2);
1072 let results = vec![
1073 (region_id, Ok(())),
1074 (region_id2, Err("flush failed".to_string())),
1075 ];
1076 let batch_reply = FlushRegionReply::from_results(results);
1077 assert!(!batch_reply.overall_success);
1078 assert_eq!(batch_reply.results.len(), 2);
1079
1080 let simple_reply = batch_reply.to_simple_reply();
1082 assert!(!simple_reply.result);
1083 assert!(simple_reply.error.is_some());
1084 assert!(simple_reply.error.unwrap().contains("flush failed"));
1085 }
1086
1087 #[test]
1088 fn test_serialize_flush_regions_instruction() {
1089 let region_id = RegionId::new(1024, 1);
1090 let flush_regions = FlushRegions::sync_single(region_id);
1091 let instruction = Instruction::FlushRegions(flush_regions.clone());
1092
1093 let serialized = serde_json::to_string(&instruction).unwrap();
1094 let deserialized: Instruction = serde_json::from_str(&serialized).unwrap();
1095
1096 match deserialized {
1097 Instruction::FlushRegions(fr) => {
1098 assert_eq!(fr.region_ids, vec![region_id]);
1099 assert_eq!(fr.strategy, FlushStrategy::Sync);
1100 assert_eq!(fr.error_strategy, FlushErrorStrategy::FailFast);
1101 }
1102 _ => panic!("Expected FlushRegions instruction"),
1103 }
1104 }
1105
1106 #[test]
1107 fn test_serialize_flush_regions_batch_instruction() {
1108 let region_ids = vec![RegionId::new(1024, 1), RegionId::new(1024, 2)];
1109 let flush_regions =
1110 FlushRegions::sync_batch(region_ids.clone(), FlushErrorStrategy::TryAll);
1111 let instruction = Instruction::FlushRegions(flush_regions);
1112
1113 let serialized = serde_json::to_string(&instruction).unwrap();
1114 let deserialized: Instruction = serde_json::from_str(&serialized).unwrap();
1115
1116 match deserialized {
1117 Instruction::FlushRegions(fr) => {
1118 assert_eq!(fr.region_ids, region_ids);
1119 assert_eq!(fr.strategy, FlushStrategy::Sync);
1120 assert!(!fr.is_hint());
1121 assert!(fr.is_sync());
1122 assert_eq!(fr.error_strategy, FlushErrorStrategy::TryAll);
1123 }
1124 _ => panic!("Expected FlushRegions instruction"),
1125 }
1126 }
1127
1128 #[test]
1129 fn test_serialize_get_file_refs_instruction_reply() {
1130 let mut manifest = FileRefsManifest::default();
1131 let r0 = RegionId::new(1024, 1);
1132 let r1 = RegionId::new(1024, 2);
1133 manifest
1134 .file_refs
1135 .insert(r0, HashSet::from([FileId::random()]));
1136 manifest
1137 .file_refs
1138 .insert(r1, HashSet::from([FileId::random()]));
1139 manifest.manifest_version.insert(r0, 10);
1140 manifest.manifest_version.insert(r1, 20);
1141
1142 let instruction_reply = InstructionReply::GetFileRefs(GetFileRefsReply {
1143 file_refs_manifest: manifest,
1144 success: true,
1145 error: None,
1146 });
1147
1148 let serialized = serde_json::to_string(&instruction_reply).unwrap();
1149 let deserialized = serde_json::from_str(&serialized).unwrap();
1150
1151 assert_eq!(instruction_reply, deserialized);
1152 }
1153}