1use std::collections::{HashMap, HashSet};
16use std::fmt::{Display, Formatter};
17use std::time::Duration;
18
19use serde::{Deserialize, Deserializer, Serialize, Serializer};
20use store_api::region_engine::SyncRegionFromRequest;
21use store_api::storage::{FileRefsManifest, GcReport, RegionId, RegionNumber};
22use strum::Display;
23use table::metadata::TableId;
24use table::table_name::TableName;
25
26use crate::flow_name::FlowName;
27use crate::key::schema_name::SchemaName;
28use crate::key::{FlowId, FlowPartitionId};
29use crate::peer::Peer;
30use crate::{DatanodeId, FlownodeId};
31
32#[derive(Eq, Hash, PartialEq, Clone, Debug, Serialize, Deserialize)]
33pub struct RegionIdent {
34 pub datanode_id: DatanodeId,
35 pub table_id: TableId,
36 pub region_number: RegionNumber,
37 pub engine: String,
38}
39
40impl RegionIdent {
41 pub fn get_region_id(&self) -> RegionId {
42 RegionId::new(self.table_id, self.region_number)
43 }
44}
45
46impl Display for RegionIdent {
47 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
48 write!(
49 f,
50 "RegionIdent(datanode_id='{}', table_id={}, region_number={}, engine = {})",
51 self.datanode_id, self.table_id, self.region_number, self.engine
52 )
53 }
54}
55
56#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
58pub struct DowngradeRegionReply {
59 #[serde(default)]
62 pub region_id: RegionId,
63 pub last_entry_id: Option<u64>,
65 pub metadata_last_entry_id: Option<u64>,
67 pub exists: bool,
69 pub error: Option<String>,
71}
72
73impl Display for DowngradeRegionReply {
74 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
75 write!(
76 f,
77 "(last_entry_id={:?}, exists={}, error={:?})",
78 self.last_entry_id, self.exists, self.error
79 )
80 }
81}
82
83#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
84pub struct SimpleReply {
85 pub result: bool,
86 pub error: Option<String>,
87}
88
89#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
91pub struct FlushRegionReply {
92 pub results: Vec<(RegionId, Result<(), String>)>,
96 pub overall_success: bool,
98}
99
100impl FlushRegionReply {
101 pub fn success_single(region_id: RegionId) -> Self {
103 Self {
104 results: vec![(region_id, Ok(()))],
105 overall_success: true,
106 }
107 }
108
109 pub fn error_single(region_id: RegionId, error: String) -> Self {
111 Self {
112 results: vec![(region_id, Err(error))],
113 overall_success: false,
114 }
115 }
116
117 pub fn from_results(results: Vec<(RegionId, Result<(), String>)>) -> Self {
119 let overall_success = results.iter().all(|(_, result)| result.is_ok());
120 Self {
121 results,
122 overall_success,
123 }
124 }
125
126 pub fn to_simple_reply(&self) -> SimpleReply {
128 if self.overall_success {
129 SimpleReply {
130 result: true,
131 error: None,
132 }
133 } else {
134 let errors: Vec<String> = self
135 .results
136 .iter()
137 .filter_map(|(region_id, result)| {
138 result
139 .as_ref()
140 .err()
141 .map(|err| format!("{}: {}", region_id, err))
142 })
143 .collect();
144 SimpleReply {
145 result: false,
146 error: Some(errors.join("; ")),
147 }
148 }
149 }
150}
151
152impl Display for SimpleReply {
153 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
154 write!(f, "(result={}, error={:?})", self.result, self.error)
155 }
156}
157
158impl Display for FlushRegionReply {
159 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
160 let results_str = self
161 .results
162 .iter()
163 .map(|(region_id, result)| match result {
164 Ok(()) => format!("{}:OK", region_id),
165 Err(err) => format!("{}:ERR({})", region_id, err),
166 })
167 .collect::<Vec<_>>()
168 .join(", ");
169 write!(
170 f,
171 "(overall_success={}, results=[{}])",
172 self.overall_success, results_str
173 )
174 }
175}
176
177impl Display for OpenRegion {
178 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
179 write!(
180 f,
181 "OpenRegion(region_ident={}, region_storage_path={})",
182 self.region_ident, self.region_storage_path
183 )
184 }
185}
186
187#[serde_with::serde_as]
188#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
189pub struct OpenRegion {
190 pub region_ident: RegionIdent,
191 pub region_storage_path: String,
192 pub region_options: HashMap<String, String>,
193 #[serde(default)]
194 #[serde_as(as = "HashMap<serde_with::DisplayFromStr, _>")]
195 pub region_wal_options: HashMap<RegionNumber, String>,
196 #[serde(default)]
197 pub skip_wal_replay: bool,
198}
199
200impl OpenRegion {
201 pub fn new(
202 region_ident: RegionIdent,
203 path: &str,
204 region_options: HashMap<String, String>,
205 region_wal_options: HashMap<RegionNumber, String>,
206 skip_wal_replay: bool,
207 ) -> Self {
208 Self {
209 region_ident,
210 region_storage_path: path.to_string(),
211 region_options,
212 region_wal_options,
213 skip_wal_replay,
214 }
215 }
216}
217
218#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
220pub struct DowngradeRegion {
221 pub region_id: RegionId,
223 #[serde(default)]
227 pub flush_timeout: Option<Duration>,
228}
229
230impl Display for DowngradeRegion {
231 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
232 write!(
233 f,
234 "DowngradeRegion(region_id={}, flush_timeout={:?})",
235 self.region_id, self.flush_timeout,
236 )
237 }
238}
239
240#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
242pub struct UpgradeRegion {
243 pub region_id: RegionId,
245 pub last_entry_id: Option<u64>,
247 pub metadata_last_entry_id: Option<u64>,
249 #[serde(with = "humantime_serde")]
254 pub replay_timeout: Duration,
255 #[serde(default)]
257 pub location_id: Option<u64>,
258 #[serde(default, skip_serializing_if = "Option::is_none")]
259 pub replay_entry_id: Option<u64>,
260 #[serde(default, skip_serializing_if = "Option::is_none")]
261 pub metadata_replay_entry_id: Option<u64>,
262}
263
264impl UpgradeRegion {
265 pub fn with_replay_entry_id(mut self, replay_entry_id: Option<u64>) -> Self {
267 self.replay_entry_id = replay_entry_id;
268 self
269 }
270
271 pub fn with_metadata_replay_entry_id(mut self, metadata_replay_entry_id: Option<u64>) -> Self {
273 self.metadata_replay_entry_id = metadata_replay_entry_id;
274 self
275 }
276}
277
278#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
279pub enum CacheIdent {
281 FlowId(FlowId),
282 FlowNodeAddressChange(u64),
284 FlowName(FlowName),
285 TableId(TableId),
286 TableName(TableName),
287 SchemaName(SchemaName),
288 CreateFlow(CreateFlow),
289 DropFlow(DropFlow),
290}
291
292#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
293pub struct CreateFlow {
294 pub flow_id: FlowId,
296 pub source_table_ids: Vec<TableId>,
297 pub partition_to_peer_mapping: Vec<(FlowPartitionId, Peer)>,
299}
300
301#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
302pub struct DropFlow {
303 pub flow_id: FlowId,
304 pub source_table_ids: Vec<TableId>,
305 pub flow_part2node_id: Vec<(FlowPartitionId, FlownodeId)>,
307}
308
309#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
311pub enum FlushStrategy {
312 #[default]
314 Sync,
315 Async,
317}
318
319#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
321pub enum FlushErrorStrategy {
322 #[default]
324 FailFast,
325 TryAll,
327}
328
329#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
332pub struct FlushRegions {
333 pub region_ids: Vec<RegionId>,
335 #[serde(default)]
337 pub strategy: FlushStrategy,
338 #[serde(default)]
340 pub error_strategy: FlushErrorStrategy,
341}
342
343impl Display for FlushRegions {
344 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
345 write!(
346 f,
347 "FlushRegions(region_ids={:?}, strategy={:?}, error_strategy={:?})",
348 self.region_ids, self.strategy, self.error_strategy
349 )
350 }
351}
352
353impl FlushRegions {
354 pub fn sync_single(region_id: RegionId) -> Self {
356 Self {
357 region_ids: vec![region_id],
358 strategy: FlushStrategy::Sync,
359 error_strategy: FlushErrorStrategy::FailFast,
360 }
361 }
362
363 pub fn async_batch(region_ids: Vec<RegionId>) -> Self {
365 Self {
366 region_ids,
367 strategy: FlushStrategy::Async,
368 error_strategy: FlushErrorStrategy::TryAll,
369 }
370 }
371
372 pub fn sync_batch(region_ids: Vec<RegionId>, error_strategy: FlushErrorStrategy) -> Self {
374 Self {
375 region_ids,
376 strategy: FlushStrategy::Sync,
377 error_strategy,
378 }
379 }
380
381 pub fn is_single_region(&self) -> bool {
383 self.region_ids.len() == 1
384 }
385
386 pub fn single_region_id(&self) -> Option<RegionId> {
388 if self.is_single_region() {
389 self.region_ids.first().copied()
390 } else {
391 None
392 }
393 }
394
395 pub fn is_hint(&self) -> bool {
397 matches!(self.strategy, FlushStrategy::Async)
398 }
399
400 pub fn is_sync(&self) -> bool {
402 matches!(self.strategy, FlushStrategy::Sync)
403 }
404}
405
406impl From<RegionId> for FlushRegions {
407 fn from(region_id: RegionId) -> Self {
408 Self::sync_single(region_id)
409 }
410}
411
412#[derive(Debug, Deserialize)]
413#[serde(untagged)]
414enum SingleOrMultiple<T> {
415 Single(T),
416 Multiple(Vec<T>),
417}
418
419fn single_or_multiple_from<'de, D, T>(deserializer: D) -> Result<Vec<T>, D::Error>
420where
421 D: Deserializer<'de>,
422 T: Deserialize<'de>,
423{
424 let helper = SingleOrMultiple::<T>::deserialize(deserializer)?;
425 Ok(match helper {
426 SingleOrMultiple::Single(x) => vec![x],
427 SingleOrMultiple::Multiple(xs) => xs,
428 })
429}
430
431#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
433pub struct GetFileRefs {
434 pub query_regions: Vec<RegionId>,
436 pub related_regions: HashMap<RegionId, HashSet<RegionId>>,
441}
442
443impl Display for GetFileRefs {
444 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
445 write!(f, "GetFileRefs(region_ids={:?})", self.query_regions)
446 }
447}
448
449#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
451pub struct GcRegions {
452 pub regions: Vec<RegionId>,
454 pub file_refs_manifest: FileRefsManifest,
456 pub full_file_listing: bool,
458}
459
460impl Display for GcRegions {
461 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
462 write!(
463 f,
464 "GcRegion(regions={:?}, file_refs_count={}, full_file_listing={})",
465 self.regions,
466 self.file_refs_manifest.file_refs.len(),
467 self.full_file_listing
468 )
469 }
470}
471
472#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
474pub struct GetFileRefsReply {
475 pub file_refs_manifest: FileRefsManifest,
477 pub success: bool,
479 pub error: Option<String>,
481}
482
483impl Display for GetFileRefsReply {
484 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
485 write!(
486 f,
487 "GetFileRefsReply(success={}, file_refs_count={}, error={:?})",
488 self.success,
489 self.file_refs_manifest.file_refs.len(),
490 self.error
491 )
492 }
493}
494
495#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
497pub struct GcRegionsReply {
498 pub result: Result<GcReport, String>,
499}
500
501impl Display for GcRegionsReply {
502 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
503 write!(
504 f,
505 "GcReply(result={})",
506 match &self.result {
507 Ok(report) => format!(
508 "GcReport(deleted_files_count={}, need_retry_regions_count={})",
509 report.deleted_files.len(),
510 report.need_retry_regions.len()
511 ),
512 Err(err) => format!("Err({})", err),
513 }
514 )
515 }
516}
517
518#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
519pub struct EnterStagingRegion {
520 pub region_id: RegionId,
521 #[serde(
522 alias = "partition_expr",
523 deserialize_with = "deserialize_enter_staging_partition_directive",
524 serialize_with = "serialize_enter_staging_partition_directive"
525 )]
526 pub partition_directive: StagingPartitionDirective,
527}
528
529impl Display for EnterStagingRegion {
530 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
531 write!(
532 f,
533 "EnterStagingRegion(region_id={}, partition_directive={})",
534 self.region_id, self.partition_directive
535 )
536 }
537}
538
539#[derive(Debug, Clone, PartialEq, Eq)]
540pub enum StagingPartitionDirective {
541 UpdatePartitionExpr(String),
542 RejectAllWrites,
543}
544
545impl StagingPartitionDirective {
546 pub fn as_partition_expr(&self) -> Option<&str> {
548 match self {
549 Self::UpdatePartitionExpr(expr) => Some(expr),
550 Self::RejectAllWrites => None,
551 }
552 }
553}
554
555impl Display for StagingPartitionDirective {
556 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
557 match self {
558 Self::UpdatePartitionExpr(expr) => write!(f, "UpdatePartitionExpr({})", expr),
559 Self::RejectAllWrites => write!(f, "RejectAllWrites"),
560 }
561 }
562}
563
564fn serialize_enter_staging_partition_directive<S>(
565 rule: &StagingPartitionDirective,
566 serializer: S,
567) -> std::result::Result<S::Ok, S::Error>
568where
569 S: Serializer,
570{
571 match rule {
572 StagingPartitionDirective::UpdatePartitionExpr(expr) => serializer.serialize_str(expr),
573 StagingPartitionDirective::RejectAllWrites => {
574 #[derive(Serialize)]
575 struct RejectAllWritesSer<'a> {
576 r#type: &'a str,
577 }
578
579 RejectAllWritesSer {
580 r#type: "reject_all_writes",
581 }
582 .serialize(serializer)
583 }
584 }
585}
586
587fn deserialize_enter_staging_partition_directive<'de, D>(
588 deserializer: D,
589) -> std::result::Result<StagingPartitionDirective, D::Error>
590where
591 D: Deserializer<'de>,
592{
593 #[derive(Deserialize)]
594 #[serde(untagged)]
595 enum Compat {
596 Legacy(String),
597 TypeTagged { r#type: String },
598 }
599
600 match Compat::deserialize(deserializer)? {
601 Compat::Legacy(expr) => Ok(StagingPartitionDirective::UpdatePartitionExpr(expr)),
602 Compat::TypeTagged { r#type } if r#type == "reject_all_writes" => {
603 Ok(StagingPartitionDirective::RejectAllWrites)
604 }
605 Compat::TypeTagged { r#type } => Err(serde::de::Error::custom(format!(
606 "Unknown enter staging partition directive type: {}",
607 r#type
608 ))),
609 }
610}
611
612#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
614pub struct SyncRegion {
615 pub region_id: RegionId,
617 pub request: SyncRegionFromRequest,
619}
620
621impl Display for SyncRegion {
622 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
623 write!(
624 f,
625 "SyncRegion(region_id={}, request={:?})",
626 self.region_id, self.request
627 )
628 }
629}
630
631#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
632pub struct RemapManifest {
633 pub region_id: RegionId,
634 pub input_regions: Vec<RegionId>,
636 pub region_mapping: HashMap<RegionId, Vec<RegionId>>,
638 pub new_partition_exprs: HashMap<RegionId, String>,
640}
641
642impl Display for RemapManifest {
643 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
644 write!(
645 f,
646 "RemapManifest(region_id={}, input_regions={:?}, region_mapping={:?}, new_partition_exprs={:?})",
647 self.region_id, self.input_regions, self.region_mapping, self.new_partition_exprs
648 )
649 }
650}
651
652#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
653pub struct ApplyStagingManifest {
654 pub region_id: RegionId,
656 pub partition_expr: String,
658 pub central_region_id: RegionId,
660 pub manifest_path: String,
662}
663
664impl Display for ApplyStagingManifest {
665 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
666 write!(
667 f,
668 "ApplyStagingManifest(region_id={}, partition_expr={}, central_region_id={}, manifest_path={})",
669 self.region_id, self.partition_expr, self.central_region_id, self.manifest_path
670 )
671 }
672}
673
674#[derive(Debug, Clone, Serialize, Deserialize, Display, PartialEq)]
675pub enum Instruction {
676 #[serde(deserialize_with = "single_or_multiple_from", alias = "OpenRegion")]
678 OpenRegions(Vec<OpenRegion>),
679 #[serde(deserialize_with = "single_or_multiple_from", alias = "CloseRegion")]
681 CloseRegions(Vec<RegionIdent>),
682 #[serde(deserialize_with = "single_or_multiple_from", alias = "UpgradeRegion")]
684 UpgradeRegions(Vec<UpgradeRegion>),
685 #[serde(
686 deserialize_with = "single_or_multiple_from",
687 alias = "DowngradeRegion"
688 )]
689 DowngradeRegions(Vec<DowngradeRegion>),
691 InvalidateCaches(Vec<CacheIdent>),
693 FlushRegions(FlushRegions),
695 GetFileRefs(GetFileRefs),
697 GcRegions(GcRegions),
699 Suspend,
701 EnterStagingRegions(Vec<EnterStagingRegion>),
703 SyncRegions(Vec<SyncRegion>),
705 RemapManifest(RemapManifest),
707
708 ApplyStagingManifests(Vec<ApplyStagingManifest>),
710}
711
712impl Instruction {
713 pub fn into_open_regions(self) -> Option<Vec<OpenRegion>> {
715 match self {
716 Self::OpenRegions(open_regions) => Some(open_regions),
717 _ => None,
718 }
719 }
720
721 pub fn into_close_regions(self) -> Option<Vec<RegionIdent>> {
723 match self {
724 Self::CloseRegions(close_regions) => Some(close_regions),
725 _ => None,
726 }
727 }
728
729 pub fn into_flush_regions(self) -> Option<FlushRegions> {
731 match self {
732 Self::FlushRegions(flush_regions) => Some(flush_regions),
733 _ => None,
734 }
735 }
736
737 pub fn into_downgrade_regions(self) -> Option<Vec<DowngradeRegion>> {
739 match self {
740 Self::DowngradeRegions(downgrade_region) => Some(downgrade_region),
741 _ => None,
742 }
743 }
744
745 pub fn into_upgrade_regions(self) -> Option<Vec<UpgradeRegion>> {
747 match self {
748 Self::UpgradeRegions(upgrade_region) => Some(upgrade_region),
749 _ => None,
750 }
751 }
752
753 pub fn into_get_file_refs(self) -> Option<GetFileRefs> {
754 match self {
755 Self::GetFileRefs(get_file_refs) => Some(get_file_refs),
756 _ => None,
757 }
758 }
759
760 pub fn into_gc_regions(self) -> Option<GcRegions> {
761 match self {
762 Self::GcRegions(gc_regions) => Some(gc_regions),
763 _ => None,
764 }
765 }
766
767 pub fn into_enter_staging_regions(self) -> Option<Vec<EnterStagingRegion>> {
768 match self {
769 Self::EnterStagingRegions(enter_staging) => Some(enter_staging),
770 _ => None,
771 }
772 }
773
774 pub fn into_sync_regions(self) -> Option<Vec<SyncRegion>> {
775 match self {
776 Self::SyncRegions(sync_regions) => Some(sync_regions),
777 _ => None,
778 }
779 }
780}
781
782#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
784pub struct UpgradeRegionReply {
785 #[serde(default)]
788 pub region_id: RegionId,
789 pub ready: bool,
791 pub exists: bool,
793 pub error: Option<String>,
795}
796
797impl Display for UpgradeRegionReply {
798 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
799 write!(
800 f,
801 "(ready={}, exists={}, error={:?})",
802 self.ready, self.exists, self.error
803 )
804 }
805}
806
807#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
808pub struct DowngradeRegionsReply {
809 pub replies: Vec<DowngradeRegionReply>,
810}
811
812impl DowngradeRegionsReply {
813 pub fn new(replies: Vec<DowngradeRegionReply>) -> Self {
814 Self { replies }
815 }
816
817 pub fn single(reply: DowngradeRegionReply) -> Self {
818 Self::new(vec![reply])
819 }
820}
821
822#[derive(Deserialize)]
823#[serde(untagged)]
824enum DowngradeRegionsCompat {
825 Single(DowngradeRegionReply),
826 Multiple(DowngradeRegionsReply),
827}
828
829fn downgrade_regions_compat_from<'de, D>(deserializer: D) -> Result<DowngradeRegionsReply, D::Error>
830where
831 D: Deserializer<'de>,
832{
833 let helper = DowngradeRegionsCompat::deserialize(deserializer)?;
834 Ok(match helper {
835 DowngradeRegionsCompat::Single(x) => DowngradeRegionsReply::new(vec![x]),
836 DowngradeRegionsCompat::Multiple(reply) => reply,
837 })
838}
839
840#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
841pub struct UpgradeRegionsReply {
842 pub replies: Vec<UpgradeRegionReply>,
843}
844
845impl UpgradeRegionsReply {
846 pub fn new(replies: Vec<UpgradeRegionReply>) -> Self {
847 Self { replies }
848 }
849
850 pub fn single(reply: UpgradeRegionReply) -> Self {
851 Self::new(vec![reply])
852 }
853}
854
855#[derive(Deserialize)]
856#[serde(untagged)]
857enum UpgradeRegionsCompat {
858 Single(UpgradeRegionReply),
859 Multiple(UpgradeRegionsReply),
860}
861
862fn upgrade_regions_compat_from<'de, D>(deserializer: D) -> Result<UpgradeRegionsReply, D::Error>
863where
864 D: Deserializer<'de>,
865{
866 let helper = UpgradeRegionsCompat::deserialize(deserializer)?;
867 Ok(match helper {
868 UpgradeRegionsCompat::Single(x) => UpgradeRegionsReply::new(vec![x]),
869 UpgradeRegionsCompat::Multiple(reply) => reply,
870 })
871}
872
873#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
874pub struct EnterStagingRegionReply {
875 pub region_id: RegionId,
876 pub ready: bool,
878 pub exists: bool,
880 pub error: Option<String>,
882}
883
884#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
885pub struct EnterStagingRegionsReply {
886 pub replies: Vec<EnterStagingRegionReply>,
887}
888
889impl EnterStagingRegionsReply {
890 pub fn new(replies: Vec<EnterStagingRegionReply>) -> Self {
891 Self { replies }
892 }
893}
894
895#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
897pub struct SyncRegionReply {
898 pub region_id: RegionId,
900 pub ready: bool,
902 pub exists: bool,
904 pub error: Option<String>,
906}
907
908#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
910pub struct SyncRegionsReply {
911 pub replies: Vec<SyncRegionReply>,
912}
913
914impl SyncRegionsReply {
915 pub fn new(replies: Vec<SyncRegionReply>) -> Self {
916 Self { replies }
917 }
918}
919
920#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
921pub struct RemapManifestReply {
922 pub exists: bool,
924 pub manifest_paths: HashMap<RegionId, String>,
926 pub error: Option<String>,
928}
929
930impl Display for RemapManifestReply {
931 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
932 write!(
933 f,
934 "RemapManifestReply(manifest_paths={:?}, error={:?})",
935 self.manifest_paths, self.error
936 )
937 }
938}
939
940#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
941pub struct ApplyStagingManifestsReply {
942 pub replies: Vec<ApplyStagingManifestReply>,
943}
944
945impl ApplyStagingManifestsReply {
946 pub fn new(replies: Vec<ApplyStagingManifestReply>) -> Self {
947 Self { replies }
948 }
949}
950
951#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
952pub struct ApplyStagingManifestReply {
953 pub region_id: RegionId,
954 pub ready: bool,
956 pub exists: bool,
958 pub error: Option<String>,
960}
961
962#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
963#[serde(tag = "type", rename_all = "snake_case")]
964pub enum InstructionReply {
965 #[serde(alias = "open_region")]
966 OpenRegions(SimpleReply),
967 #[serde(alias = "close_region")]
968 CloseRegions(SimpleReply),
969 #[serde(
970 deserialize_with = "upgrade_regions_compat_from",
971 alias = "upgrade_region"
972 )]
973 UpgradeRegions(UpgradeRegionsReply),
974 #[serde(
975 alias = "downgrade_region",
976 deserialize_with = "downgrade_regions_compat_from"
977 )]
978 DowngradeRegions(DowngradeRegionsReply),
979 FlushRegions(FlushRegionReply),
980 GetFileRefs(GetFileRefsReply),
981 GcRegions(GcRegionsReply),
982 EnterStagingRegions(EnterStagingRegionsReply),
983 SyncRegions(SyncRegionsReply),
984 RemapManifest(RemapManifestReply),
985 ApplyStagingManifests(ApplyStagingManifestsReply),
986}
987
988impl Display for InstructionReply {
989 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
990 match self {
991 Self::OpenRegions(reply) => write!(f, "InstructionReply::OpenRegions({})", reply),
992 Self::CloseRegions(reply) => write!(f, "InstructionReply::CloseRegions({})", reply),
993 Self::UpgradeRegions(reply) => {
994 write!(f, "InstructionReply::UpgradeRegions({:?})", reply.replies)
995 }
996 Self::DowngradeRegions(reply) => {
997 write!(f, "InstructionReply::DowngradeRegions({:?})", reply.replies)
998 }
999 Self::FlushRegions(reply) => write!(f, "InstructionReply::FlushRegions({})", reply),
1000 Self::GetFileRefs(reply) => write!(f, "InstructionReply::GetFileRefs({})", reply),
1001 Self::GcRegions(reply) => write!(f, "InstructionReply::GcRegion({})", reply),
1002 Self::EnterStagingRegions(reply) => {
1003 write!(
1004 f,
1005 "InstructionReply::EnterStagingRegions({:?})",
1006 reply.replies
1007 )
1008 }
1009 Self::SyncRegions(reply) => {
1010 write!(f, "InstructionReply::SyncRegions({:?})", reply.replies)
1011 }
1012 Self::RemapManifest(reply) => write!(f, "InstructionReply::RemapManifest({})", reply),
1013 Self::ApplyStagingManifests(reply) => write!(
1014 f,
1015 "InstructionReply::ApplyStagingManifests({:?})",
1016 reply.replies
1017 ),
1018 }
1019 }
1020}
1021
1022#[cfg(any(test, feature = "testing"))]
1023impl InstructionReply {
1024 pub fn expect_close_regions_reply(self) -> SimpleReply {
1025 match self {
1026 Self::CloseRegions(reply) => reply,
1027 _ => panic!("Expected CloseRegions reply"),
1028 }
1029 }
1030
1031 pub fn expect_open_regions_reply(self) -> SimpleReply {
1032 match self {
1033 Self::OpenRegions(reply) => reply,
1034 _ => panic!("Expected OpenRegions reply"),
1035 }
1036 }
1037
1038 pub fn expect_upgrade_regions_reply(self) -> Vec<UpgradeRegionReply> {
1039 match self {
1040 Self::UpgradeRegions(reply) => reply.replies,
1041 _ => panic!("Expected UpgradeRegion reply"),
1042 }
1043 }
1044
1045 pub fn expect_downgrade_regions_reply(self) -> Vec<DowngradeRegionReply> {
1046 match self {
1047 Self::DowngradeRegions(reply) => reply.replies,
1048 _ => panic!("Expected DowngradeRegion reply"),
1049 }
1050 }
1051
1052 pub fn expect_flush_regions_reply(self) -> FlushRegionReply {
1053 match self {
1054 Self::FlushRegions(reply) => reply,
1055 _ => panic!("Expected FlushRegions reply"),
1056 }
1057 }
1058
1059 pub fn expect_enter_staging_regions_reply(self) -> Vec<EnterStagingRegionReply> {
1060 match self {
1061 Self::EnterStagingRegions(reply) => reply.replies,
1062 _ => panic!("Expected EnterStagingRegion reply"),
1063 }
1064 }
1065
1066 pub fn expect_sync_regions_reply(self) -> Vec<SyncRegionReply> {
1067 match self {
1068 Self::SyncRegions(reply) => reply.replies,
1069 _ => panic!("Expected SyncRegion reply"),
1070 }
1071 }
1072
1073 pub fn expect_remap_manifest_reply(self) -> RemapManifestReply {
1074 match self {
1075 Self::RemapManifest(reply) => reply,
1076 _ => panic!("Expected RemapManifest reply"),
1077 }
1078 }
1079
1080 pub fn expect_apply_staging_manifests_reply(self) -> Vec<ApplyStagingManifestReply> {
1081 match self {
1082 Self::ApplyStagingManifests(reply) => reply.replies,
1083 _ => panic!("Expected ApplyStagingManifest reply"),
1084 }
1085 }
1086}
1087
1088#[cfg(test)]
1089mod tests {
1090 use std::collections::HashSet;
1091
1092 use store_api::storage::{FileId, FileRef};
1093
1094 use super::*;
1095
1096 #[test]
1097 fn test_serialize_instruction() {
1098 let open_region = Instruction::OpenRegions(vec![OpenRegion::new(
1099 RegionIdent {
1100 datanode_id: 2,
1101 table_id: 1024,
1102 region_number: 1,
1103 engine: "mito2".to_string(),
1104 },
1105 "test/foo",
1106 HashMap::new(),
1107 HashMap::new(),
1108 false,
1109 )]);
1110
1111 let serialized = serde_json::to_string(&open_region).unwrap();
1112 assert_eq!(
1113 r#"{"OpenRegions":[{"region_ident":{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"},"region_storage_path":"test/foo","region_options":{},"region_wal_options":{},"skip_wal_replay":false}]}"#,
1114 serialized
1115 );
1116
1117 let close_region = Instruction::CloseRegions(vec![RegionIdent {
1118 datanode_id: 2,
1119 table_id: 1024,
1120 region_number: 1,
1121 engine: "mito2".to_string(),
1122 }]);
1123
1124 let serialized = serde_json::to_string(&close_region).unwrap();
1125 assert_eq!(
1126 r#"{"CloseRegions":[{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"}]}"#,
1127 serialized
1128 );
1129
1130 let upgrade_region = Instruction::UpgradeRegions(vec![UpgradeRegion {
1131 region_id: RegionId::new(1024, 1),
1132 last_entry_id: None,
1133 metadata_last_entry_id: None,
1134 replay_timeout: Duration::from_millis(1000),
1135 location_id: None,
1136 replay_entry_id: None,
1137 metadata_replay_entry_id: None,
1138 }]);
1139
1140 let serialized = serde_json::to_string(&upgrade_region).unwrap();
1141 assert_eq!(
1142 r#"{"UpgradeRegions":[{"region_id":4398046511105,"last_entry_id":null,"metadata_last_entry_id":null,"replay_timeout":"1s","location_id":null}]}"#,
1143 serialized
1144 );
1145 }
1146
1147 #[test]
1148 fn test_serialize_instruction_reply() {
1149 let downgrade_region_reply = InstructionReply::DowngradeRegions(
1150 DowngradeRegionsReply::single(DowngradeRegionReply {
1151 region_id: RegionId::new(1024, 1),
1152 last_entry_id: None,
1153 metadata_last_entry_id: None,
1154 exists: true,
1155 error: None,
1156 }),
1157 );
1158
1159 let serialized = serde_json::to_string(&downgrade_region_reply).unwrap();
1160 assert_eq!(
1161 r#"{"type":"downgrade_regions","replies":[{"region_id":4398046511105,"last_entry_id":null,"metadata_last_entry_id":null,"exists":true,"error":null}]}"#,
1162 serialized
1163 );
1164
1165 let upgrade_region_reply =
1166 InstructionReply::UpgradeRegions(UpgradeRegionsReply::single(UpgradeRegionReply {
1167 region_id: RegionId::new(1024, 1),
1168 ready: true,
1169 exists: true,
1170 error: None,
1171 }));
1172 let serialized = serde_json::to_string(&upgrade_region_reply).unwrap();
1173 assert_eq!(
1174 r#"{"type":"upgrade_regions","replies":[{"region_id":4398046511105,"ready":true,"exists":true,"error":null}]}"#,
1175 serialized
1176 );
1177 }
1178
1179 #[test]
1180 fn test_deserialize_instruction() {
1181 let open_region_instruction = r#"{"OpenRegion":{"region_ident":{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"},"region_storage_path":"test/foo","region_options":{},"region_wal_options":{},"skip_wal_replay":false}}"#;
1183 let open_region_instruction: Instruction =
1184 serde_json::from_str(open_region_instruction).unwrap();
1185 let open_region = Instruction::OpenRegions(vec![OpenRegion::new(
1186 RegionIdent {
1187 datanode_id: 2,
1188 table_id: 1024,
1189 region_number: 1,
1190 engine: "mito2".to_string(),
1191 },
1192 "test/foo",
1193 HashMap::new(),
1194 HashMap::new(),
1195 false,
1196 )]);
1197 assert_eq!(open_region_instruction, open_region);
1198
1199 let close_region_instruction = r#"{"CloseRegion":{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"}}"#;
1201 let close_region_instruction: Instruction =
1202 serde_json::from_str(close_region_instruction).unwrap();
1203 let close_region = Instruction::CloseRegions(vec![RegionIdent {
1204 datanode_id: 2,
1205 table_id: 1024,
1206 region_number: 1,
1207 engine: "mito2".to_string(),
1208 }]);
1209 assert_eq!(close_region_instruction, close_region);
1210
1211 let downgrade_region_instruction = r#"{"DowngradeRegions":{"region_id":4398046511105,"flush_timeout":{"secs":1,"nanos":0}}}"#;
1213 let downgrade_region_instruction: Instruction =
1214 serde_json::from_str(downgrade_region_instruction).unwrap();
1215 let downgrade_region = Instruction::DowngradeRegions(vec![DowngradeRegion {
1216 region_id: RegionId::new(1024, 1),
1217 flush_timeout: Some(Duration::from_millis(1000)),
1218 }]);
1219 assert_eq!(downgrade_region_instruction, downgrade_region);
1220
1221 let upgrade_region_instruction = r#"{"UpgradeRegion":{"region_id":4398046511105,"last_entry_id":null,"metadata_last_entry_id":null,"replay_timeout":"1s","location_id":null,"replay_entry_id":null,"metadata_replay_entry_id":null}}"#;
1223 let upgrade_region_instruction: Instruction =
1224 serde_json::from_str(upgrade_region_instruction).unwrap();
1225 let upgrade_region = Instruction::UpgradeRegions(vec![UpgradeRegion {
1226 region_id: RegionId::new(1024, 1),
1227 last_entry_id: None,
1228 metadata_last_entry_id: None,
1229 replay_timeout: Duration::from_millis(1000),
1230 location_id: None,
1231 replay_entry_id: None,
1232 metadata_replay_entry_id: None,
1233 }]);
1234 assert_eq!(upgrade_region_instruction, upgrade_region);
1235 }
1236
1237 #[test]
1238 fn test_deserialize_instruction_reply() {
1239 let close_region_instruction_reply =
1241 r#"{"result":true,"error":null,"type":"close_region"}"#;
1242 let close_region_instruction_reply: InstructionReply =
1243 serde_json::from_str(close_region_instruction_reply).unwrap();
1244 let close_region_reply = InstructionReply::CloseRegions(SimpleReply {
1245 result: true,
1246 error: None,
1247 });
1248 assert_eq!(close_region_instruction_reply, close_region_reply);
1249
1250 let open_region_instruction_reply = r#"{"result":true,"error":null,"type":"open_region"}"#;
1252 let open_region_instruction_reply: InstructionReply =
1253 serde_json::from_str(open_region_instruction_reply).unwrap();
1254 let open_region_reply = InstructionReply::OpenRegions(SimpleReply {
1255 result: true,
1256 error: None,
1257 });
1258 assert_eq!(open_region_instruction_reply, open_region_reply);
1259
1260 let downgrade_region_instruction_reply = r#"{"region_id":4398046511105,"last_entry_id":null,"metadata_last_entry_id":null,"exists":true,"error":null,"type":"downgrade_region"}"#;
1262 let downgrade_region_instruction_reply: InstructionReply =
1263 serde_json::from_str(downgrade_region_instruction_reply).unwrap();
1264 let downgrade_region_reply = InstructionReply::DowngradeRegions(
1265 DowngradeRegionsReply::single(DowngradeRegionReply {
1266 region_id: RegionId::new(1024, 1),
1267 last_entry_id: None,
1268 metadata_last_entry_id: None,
1269 exists: true,
1270 error: None,
1271 }),
1272 );
1273 assert_eq!(downgrade_region_instruction_reply, downgrade_region_reply);
1274
1275 let upgrade_region_instruction_reply = r#"{"region_id":4398046511105,"ready":true,"exists":true,"error":null,"type":"upgrade_region"}"#;
1277 let upgrade_region_instruction_reply: InstructionReply =
1278 serde_json::from_str(upgrade_region_instruction_reply).unwrap();
1279 let upgrade_region_reply =
1280 InstructionReply::UpgradeRegions(UpgradeRegionsReply::single(UpgradeRegionReply {
1281 region_id: RegionId::new(1024, 1),
1282 ready: true,
1283 exists: true,
1284 error: None,
1285 }));
1286 assert_eq!(upgrade_region_instruction_reply, upgrade_region_reply);
1287 }
1288
1289 #[test]
1290 fn test_enter_staging_partition_rule_compatibility() {
1291 let legacy = r#"{"region_id":4398046511105,"partition_expr":"{\"Expr\":{\"lhs\":{\"Column\":\"x\"},\"op\":\"GtEq\",\"rhs\":{\"Value\":{\"Int32\":0}}}}"}"#;
1292 let enter: EnterStagingRegion = serde_json::from_str(legacy).unwrap();
1293 assert_eq!(enter.region_id, RegionId::new(1024, 1));
1294 assert_eq!(
1295 enter.partition_directive,
1296 StagingPartitionDirective::UpdatePartitionExpr(
1297 "{\"Expr\":{\"lhs\":{\"Column\":\"x\"},\"op\":\"GtEq\",\"rhs\":{\"Value\":{\"Int32\":0}}}}"
1298 .to_string()
1299 )
1300 );
1301
1302 let serialized = serde_json::to_string(&enter).unwrap();
1303 assert!(serialized.contains("\"partition_directive\":\""));
1304 assert!(!serialized.contains("partition_expr"));
1305
1306 let reject = r#"{"region_id":4398046511105,"partition_expr":{"type":"reject_all_writes"}}"#;
1307 let enter: EnterStagingRegion = serde_json::from_str(reject).unwrap();
1308 assert_eq!(
1309 enter.partition_directive,
1310 StagingPartitionDirective::RejectAllWrites
1311 );
1312 }
1313
1314 #[derive(Debug, Clone, Serialize, Deserialize)]
1315 struct LegacyOpenRegion {
1316 region_ident: RegionIdent,
1317 region_storage_path: String,
1318 region_options: HashMap<String, String>,
1319 }
1320
1321 #[test]
1322 fn test_compatible_serialize_open_region() {
1323 let region_ident = RegionIdent {
1324 datanode_id: 2,
1325 table_id: 1024,
1326 region_number: 1,
1327 engine: "mito2".to_string(),
1328 };
1329 let region_storage_path = "test/foo".to_string();
1330 let region_options = HashMap::from([
1331 ("a".to_string(), "aa".to_string()),
1332 ("b".to_string(), "bb".to_string()),
1333 ]);
1334
1335 let legacy_open_region = LegacyOpenRegion {
1337 region_ident: region_ident.clone(),
1338 region_storage_path: region_storage_path.clone(),
1339 region_options: region_options.clone(),
1340 };
1341 let serialized = serde_json::to_string(&legacy_open_region).unwrap();
1342
1343 let deserialized = serde_json::from_str(&serialized).unwrap();
1345 let expected = OpenRegion {
1346 region_ident,
1347 region_storage_path,
1348 region_options,
1349 region_wal_options: HashMap::new(),
1350 skip_wal_replay: false,
1351 };
1352 assert_eq!(expected, deserialized);
1353 }
1354
1355 #[test]
1356 fn test_flush_regions_creation() {
1357 let region_id = RegionId::new(1024, 1);
1358
1359 let single_sync = FlushRegions::sync_single(region_id);
1361 assert_eq!(single_sync.region_ids, vec![region_id]);
1362 assert_eq!(single_sync.strategy, FlushStrategy::Sync);
1363 assert!(!single_sync.is_hint());
1364 assert!(single_sync.is_sync());
1365 assert_eq!(single_sync.error_strategy, FlushErrorStrategy::FailFast);
1366 assert!(single_sync.is_single_region());
1367 assert_eq!(single_sync.single_region_id(), Some(region_id));
1368
1369 let region_ids = vec![RegionId::new(1024, 1), RegionId::new(1024, 2)];
1371 let batch_async = FlushRegions::async_batch(region_ids.clone());
1372 assert_eq!(batch_async.region_ids, region_ids);
1373 assert_eq!(batch_async.strategy, FlushStrategy::Async);
1374 assert!(batch_async.is_hint());
1375 assert!(!batch_async.is_sync());
1376 assert_eq!(batch_async.error_strategy, FlushErrorStrategy::TryAll);
1377 assert!(!batch_async.is_single_region());
1378 assert_eq!(batch_async.single_region_id(), None);
1379
1380 let batch_sync = FlushRegions::sync_batch(region_ids.clone(), FlushErrorStrategy::FailFast);
1382 assert_eq!(batch_sync.region_ids, region_ids);
1383 assert_eq!(batch_sync.strategy, FlushStrategy::Sync);
1384 assert!(!batch_sync.is_hint());
1385 assert!(batch_sync.is_sync());
1386 assert_eq!(batch_sync.error_strategy, FlushErrorStrategy::FailFast);
1387 }
1388
1389 #[test]
1390 fn test_flush_regions_conversion() {
1391 let region_id = RegionId::new(1024, 1);
1392
1393 let from_region_id: FlushRegions = region_id.into();
1394 assert_eq!(from_region_id.region_ids, vec![region_id]);
1395 assert_eq!(from_region_id.strategy, FlushStrategy::Sync);
1396 assert!(!from_region_id.is_hint());
1397 assert!(from_region_id.is_sync());
1398
1399 let flush_regions = FlushRegions {
1401 region_ids: vec![region_id],
1402 strategy: FlushStrategy::Async,
1403 error_strategy: FlushErrorStrategy::TryAll,
1404 };
1405 assert_eq!(flush_regions.region_ids, vec![region_id]);
1406 assert_eq!(flush_regions.strategy, FlushStrategy::Async);
1407 assert!(flush_regions.is_hint());
1408 assert!(!flush_regions.is_sync());
1409 }
1410
1411 #[test]
1412 fn test_flush_region_reply() {
1413 let region_id = RegionId::new(1024, 1);
1414
1415 let success_reply = FlushRegionReply::success_single(region_id);
1417 assert!(success_reply.overall_success);
1418 assert_eq!(success_reply.results.len(), 1);
1419 assert_eq!(success_reply.results[0].0, region_id);
1420 assert!(success_reply.results[0].1.is_ok());
1421
1422 let error_reply = FlushRegionReply::error_single(region_id, "test error".to_string());
1424 assert!(!error_reply.overall_success);
1425 assert_eq!(error_reply.results.len(), 1);
1426 assert_eq!(error_reply.results[0].0, region_id);
1427 assert!(error_reply.results[0].1.is_err());
1428
1429 let region_id2 = RegionId::new(1024, 2);
1431 let results = vec![
1432 (region_id, Ok(())),
1433 (region_id2, Err("flush failed".to_string())),
1434 ];
1435 let batch_reply = FlushRegionReply::from_results(results);
1436 assert!(!batch_reply.overall_success);
1437 assert_eq!(batch_reply.results.len(), 2);
1438
1439 let simple_reply = batch_reply.to_simple_reply();
1441 assert!(!simple_reply.result);
1442 assert!(simple_reply.error.is_some());
1443 assert!(simple_reply.error.unwrap().contains("flush failed"));
1444 }
1445
1446 #[test]
1447 fn test_serialize_flush_regions_instruction() {
1448 let region_id = RegionId::new(1024, 1);
1449 let flush_regions = FlushRegions::sync_single(region_id);
1450 let instruction = Instruction::FlushRegions(flush_regions.clone());
1451
1452 let serialized = serde_json::to_string(&instruction).unwrap();
1453 let deserialized: Instruction = serde_json::from_str(&serialized).unwrap();
1454
1455 match deserialized {
1456 Instruction::FlushRegions(fr) => {
1457 assert_eq!(fr.region_ids, vec![region_id]);
1458 assert_eq!(fr.strategy, FlushStrategy::Sync);
1459 assert_eq!(fr.error_strategy, FlushErrorStrategy::FailFast);
1460 }
1461 _ => panic!("Expected FlushRegions instruction"),
1462 }
1463 }
1464
1465 #[test]
1466 fn test_serialize_flush_regions_batch_instruction() {
1467 let region_ids = vec![RegionId::new(1024, 1), RegionId::new(1024, 2)];
1468 let flush_regions =
1469 FlushRegions::sync_batch(region_ids.clone(), FlushErrorStrategy::TryAll);
1470 let instruction = Instruction::FlushRegions(flush_regions);
1471
1472 let serialized = serde_json::to_string(&instruction).unwrap();
1473 let deserialized: Instruction = serde_json::from_str(&serialized).unwrap();
1474
1475 match deserialized {
1476 Instruction::FlushRegions(fr) => {
1477 assert_eq!(fr.region_ids, region_ids);
1478 assert_eq!(fr.strategy, FlushStrategy::Sync);
1479 assert!(!fr.is_hint());
1480 assert!(fr.is_sync());
1481 assert_eq!(fr.error_strategy, FlushErrorStrategy::TryAll);
1482 }
1483 _ => panic!("Expected FlushRegions instruction"),
1484 }
1485 }
1486
1487 #[test]
1488 fn test_serialize_get_file_refs_instruction_reply() {
1489 let mut manifest = FileRefsManifest::default();
1490 let r0 = RegionId::new(1024, 1);
1491 let r1 = RegionId::new(1024, 2);
1492 manifest.file_refs.insert(
1493 r0,
1494 HashSet::from([FileRef::new(r0, FileId::random(), None)]),
1495 );
1496 manifest.file_refs.insert(
1497 r1,
1498 HashSet::from([FileRef::new(r1, FileId::random(), None)]),
1499 );
1500 manifest.manifest_version.insert(r0, 10);
1501 manifest.manifest_version.insert(r1, 20);
1502
1503 let instruction_reply = InstructionReply::GetFileRefs(GetFileRefsReply {
1504 file_refs_manifest: manifest,
1505 success: true,
1506 error: None,
1507 });
1508
1509 let serialized = serde_json::to_string(&instruction_reply).unwrap();
1510 let deserialized = serde_json::from_str(&serialized).unwrap();
1511
1512 assert_eq!(instruction_reply, deserialized);
1513 }
1514}