1use std::collections::{HashMap, HashSet};
16use std::fmt::{Display, Formatter};
17use std::time::Duration;
18
19use serde::{Deserialize, Deserializer, Serialize};
20use store_api::region_engine::SyncRegionFromRequest;
21use store_api::storage::{FileRefsManifest, GcReport, RegionId, RegionNumber};
22use strum::Display;
23use table::metadata::TableId;
24use table::table_name::TableName;
25
26use crate::flow_name::FlowName;
27use crate::key::schema_name::SchemaName;
28use crate::key::{FlowId, FlowPartitionId};
29use crate::peer::Peer;
30use crate::{DatanodeId, FlownodeId};
31
32#[derive(Eq, Hash, PartialEq, Clone, Debug, Serialize, Deserialize)]
33pub struct RegionIdent {
34 pub datanode_id: DatanodeId,
35 pub table_id: TableId,
36 pub region_number: RegionNumber,
37 pub engine: String,
38}
39
40impl RegionIdent {
41 pub fn get_region_id(&self) -> RegionId {
42 RegionId::new(self.table_id, self.region_number)
43 }
44}
45
46impl Display for RegionIdent {
47 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
48 write!(
49 f,
50 "RegionIdent(datanode_id='{}', table_id={}, region_number={}, engine = {})",
51 self.datanode_id, self.table_id, self.region_number, self.engine
52 )
53 }
54}
55
56#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
58pub struct DowngradeRegionReply {
59 #[serde(default)]
62 pub region_id: RegionId,
63 pub last_entry_id: Option<u64>,
65 pub metadata_last_entry_id: Option<u64>,
67 pub exists: bool,
69 pub error: Option<String>,
71}
72
73impl Display for DowngradeRegionReply {
74 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
75 write!(
76 f,
77 "(last_entry_id={:?}, exists={}, error={:?})",
78 self.last_entry_id, self.exists, self.error
79 )
80 }
81}
82
83#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
84pub struct SimpleReply {
85 pub result: bool,
86 pub error: Option<String>,
87}
88
89#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
91pub struct FlushRegionReply {
92 pub results: Vec<(RegionId, Result<(), String>)>,
96 pub overall_success: bool,
98}
99
100impl FlushRegionReply {
101 pub fn success_single(region_id: RegionId) -> Self {
103 Self {
104 results: vec![(region_id, Ok(()))],
105 overall_success: true,
106 }
107 }
108
109 pub fn error_single(region_id: RegionId, error: String) -> Self {
111 Self {
112 results: vec![(region_id, Err(error))],
113 overall_success: false,
114 }
115 }
116
117 pub fn from_results(results: Vec<(RegionId, Result<(), String>)>) -> Self {
119 let overall_success = results.iter().all(|(_, result)| result.is_ok());
120 Self {
121 results,
122 overall_success,
123 }
124 }
125
126 pub fn to_simple_reply(&self) -> SimpleReply {
128 if self.overall_success {
129 SimpleReply {
130 result: true,
131 error: None,
132 }
133 } else {
134 let errors: Vec<String> = self
135 .results
136 .iter()
137 .filter_map(|(region_id, result)| {
138 result
139 .as_ref()
140 .err()
141 .map(|err| format!("{}: {}", region_id, err))
142 })
143 .collect();
144 SimpleReply {
145 result: false,
146 error: Some(errors.join("; ")),
147 }
148 }
149 }
150}
151
152impl Display for SimpleReply {
153 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
154 write!(f, "(result={}, error={:?})", self.result, self.error)
155 }
156}
157
158impl Display for FlushRegionReply {
159 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
160 let results_str = self
161 .results
162 .iter()
163 .map(|(region_id, result)| match result {
164 Ok(()) => format!("{}:OK", region_id),
165 Err(err) => format!("{}:ERR({})", region_id, err),
166 })
167 .collect::<Vec<_>>()
168 .join(", ");
169 write!(
170 f,
171 "(overall_success={}, results=[{}])",
172 self.overall_success, results_str
173 )
174 }
175}
176
177impl Display for OpenRegion {
178 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
179 write!(
180 f,
181 "OpenRegion(region_ident={}, region_storage_path={})",
182 self.region_ident, self.region_storage_path
183 )
184 }
185}
186
187#[serde_with::serde_as]
188#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
189pub struct OpenRegion {
190 pub region_ident: RegionIdent,
191 pub region_storage_path: String,
192 pub region_options: HashMap<String, String>,
193 #[serde(default)]
194 #[serde_as(as = "HashMap<serde_with::DisplayFromStr, _>")]
195 pub region_wal_options: HashMap<RegionNumber, String>,
196 #[serde(default)]
197 pub skip_wal_replay: bool,
198}
199
200impl OpenRegion {
201 pub fn new(
202 region_ident: RegionIdent,
203 path: &str,
204 region_options: HashMap<String, String>,
205 region_wal_options: HashMap<RegionNumber, String>,
206 skip_wal_replay: bool,
207 ) -> Self {
208 Self {
209 region_ident,
210 region_storage_path: path.to_string(),
211 region_options,
212 region_wal_options,
213 skip_wal_replay,
214 }
215 }
216}
217
218#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
220pub struct DowngradeRegion {
221 pub region_id: RegionId,
223 #[serde(default)]
227 pub flush_timeout: Option<Duration>,
228}
229
230impl Display for DowngradeRegion {
231 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
232 write!(
233 f,
234 "DowngradeRegion(region_id={}, flush_timeout={:?})",
235 self.region_id, self.flush_timeout,
236 )
237 }
238}
239
240#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
242pub struct UpgradeRegion {
243 pub region_id: RegionId,
245 pub last_entry_id: Option<u64>,
247 pub metadata_last_entry_id: Option<u64>,
249 #[serde(with = "humantime_serde")]
254 pub replay_timeout: Duration,
255 #[serde(default)]
257 pub location_id: Option<u64>,
258 #[serde(default, skip_serializing_if = "Option::is_none")]
259 pub replay_entry_id: Option<u64>,
260 #[serde(default, skip_serializing_if = "Option::is_none")]
261 pub metadata_replay_entry_id: Option<u64>,
262}
263
264impl UpgradeRegion {
265 pub fn with_replay_entry_id(mut self, replay_entry_id: Option<u64>) -> Self {
267 self.replay_entry_id = replay_entry_id;
268 self
269 }
270
271 pub fn with_metadata_replay_entry_id(mut self, metadata_replay_entry_id: Option<u64>) -> Self {
273 self.metadata_replay_entry_id = metadata_replay_entry_id;
274 self
275 }
276}
277
278#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
279pub enum CacheIdent {
281 FlowId(FlowId),
282 FlowNodeAddressChange(u64),
284 FlowName(FlowName),
285 TableId(TableId),
286 TableName(TableName),
287 SchemaName(SchemaName),
288 CreateFlow(CreateFlow),
289 DropFlow(DropFlow),
290}
291
292#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
293pub struct CreateFlow {
294 pub flow_id: FlowId,
296 pub source_table_ids: Vec<TableId>,
297 pub partition_to_peer_mapping: Vec<(FlowPartitionId, Peer)>,
299}
300
301#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
302pub struct DropFlow {
303 pub flow_id: FlowId,
304 pub source_table_ids: Vec<TableId>,
305 pub flow_part2node_id: Vec<(FlowPartitionId, FlownodeId)>,
307}
308
309#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
311pub enum FlushStrategy {
312 #[default]
314 Sync,
315 Async,
317}
318
319#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
321pub enum FlushErrorStrategy {
322 #[default]
324 FailFast,
325 TryAll,
327}
328
329#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
332pub struct FlushRegions {
333 pub region_ids: Vec<RegionId>,
335 #[serde(default)]
337 pub strategy: FlushStrategy,
338 #[serde(default)]
340 pub error_strategy: FlushErrorStrategy,
341}
342
343impl Display for FlushRegions {
344 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
345 write!(
346 f,
347 "FlushRegions(region_ids={:?}, strategy={:?}, error_strategy={:?})",
348 self.region_ids, self.strategy, self.error_strategy
349 )
350 }
351}
352
353impl FlushRegions {
354 pub fn sync_single(region_id: RegionId) -> Self {
356 Self {
357 region_ids: vec![region_id],
358 strategy: FlushStrategy::Sync,
359 error_strategy: FlushErrorStrategy::FailFast,
360 }
361 }
362
363 pub fn async_batch(region_ids: Vec<RegionId>) -> Self {
365 Self {
366 region_ids,
367 strategy: FlushStrategy::Async,
368 error_strategy: FlushErrorStrategy::TryAll,
369 }
370 }
371
372 pub fn sync_batch(region_ids: Vec<RegionId>, error_strategy: FlushErrorStrategy) -> Self {
374 Self {
375 region_ids,
376 strategy: FlushStrategy::Sync,
377 error_strategy,
378 }
379 }
380
381 pub fn is_single_region(&self) -> bool {
383 self.region_ids.len() == 1
384 }
385
386 pub fn single_region_id(&self) -> Option<RegionId> {
388 if self.is_single_region() {
389 self.region_ids.first().copied()
390 } else {
391 None
392 }
393 }
394
395 pub fn is_hint(&self) -> bool {
397 matches!(self.strategy, FlushStrategy::Async)
398 }
399
400 pub fn is_sync(&self) -> bool {
402 matches!(self.strategy, FlushStrategy::Sync)
403 }
404}
405
406impl From<RegionId> for FlushRegions {
407 fn from(region_id: RegionId) -> Self {
408 Self::sync_single(region_id)
409 }
410}
411
412#[derive(Debug, Deserialize)]
413#[serde(untagged)]
414enum SingleOrMultiple<T> {
415 Single(T),
416 Multiple(Vec<T>),
417}
418
419fn single_or_multiple_from<'de, D, T>(deserializer: D) -> Result<Vec<T>, D::Error>
420where
421 D: Deserializer<'de>,
422 T: Deserialize<'de>,
423{
424 let helper = SingleOrMultiple::<T>::deserialize(deserializer)?;
425 Ok(match helper {
426 SingleOrMultiple::Single(x) => vec![x],
427 SingleOrMultiple::Multiple(xs) => xs,
428 })
429}
430
431#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
433pub struct GetFileRefs {
434 pub query_regions: Vec<RegionId>,
436 pub related_regions: HashMap<RegionId, HashSet<RegionId>>,
441}
442
443impl Display for GetFileRefs {
444 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
445 write!(f, "GetFileRefs(region_ids={:?})", self.query_regions)
446 }
447}
448
449#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
451pub struct GcRegions {
452 pub regions: Vec<RegionId>,
454 pub file_refs_manifest: FileRefsManifest,
456 pub full_file_listing: bool,
458}
459
460impl Display for GcRegions {
461 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
462 write!(
463 f,
464 "GcRegion(regions={:?}, file_refs_count={}, full_file_listing={})",
465 self.regions,
466 self.file_refs_manifest.file_refs.len(),
467 self.full_file_listing
468 )
469 }
470}
471
472#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
474pub struct GetFileRefsReply {
475 pub file_refs_manifest: FileRefsManifest,
477 pub success: bool,
479 pub error: Option<String>,
481}
482
483impl Display for GetFileRefsReply {
484 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
485 write!(
486 f,
487 "GetFileRefsReply(success={}, file_refs_count={}, error={:?})",
488 self.success,
489 self.file_refs_manifest.file_refs.len(),
490 self.error
491 )
492 }
493}
494
495#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
497pub struct GcRegionsReply {
498 pub result: Result<GcReport, String>,
499}
500
501impl Display for GcRegionsReply {
502 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
503 write!(
504 f,
505 "GcReply(result={})",
506 match &self.result {
507 Ok(report) => format!(
508 "GcReport(deleted_files_count={}, need_retry_regions_count={})",
509 report.deleted_files.len(),
510 report.need_retry_regions.len()
511 ),
512 Err(err) => format!("Err({})", err),
513 }
514 )
515 }
516}
517
518#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
519pub struct EnterStagingRegion {
520 pub region_id: RegionId,
521 pub partition_expr: String,
522}
523
524impl Display for EnterStagingRegion {
525 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
526 write!(
527 f,
528 "EnterStagingRegion(region_id={}, partition_expr={})",
529 self.region_id, self.partition_expr
530 )
531 }
532}
533
534#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
536pub struct SyncRegion {
537 pub region_id: RegionId,
539 pub request: SyncRegionFromRequest,
541}
542
543impl Display for SyncRegion {
544 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
545 write!(
546 f,
547 "SyncRegion(region_id={}, request={:?})",
548 self.region_id, self.request
549 )
550 }
551}
552
553#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
554pub struct RemapManifest {
555 pub region_id: RegionId,
556 pub input_regions: Vec<RegionId>,
558 pub region_mapping: HashMap<RegionId, Vec<RegionId>>,
560 pub new_partition_exprs: HashMap<RegionId, String>,
562}
563
564impl Display for RemapManifest {
565 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
566 write!(
567 f,
568 "RemapManifest(region_id={}, input_regions={:?}, region_mapping={:?}, new_partition_exprs={:?})",
569 self.region_id, self.input_regions, self.region_mapping, self.new_partition_exprs
570 )
571 }
572}
573
574#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
575pub struct ApplyStagingManifest {
576 pub region_id: RegionId,
578 pub partition_expr: String,
580 pub central_region_id: RegionId,
582 pub manifest_path: String,
584}
585
586impl Display for ApplyStagingManifest {
587 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
588 write!(
589 f,
590 "ApplyStagingManifest(region_id={}, partition_expr={}, central_region_id={}, manifest_path={})",
591 self.region_id, self.partition_expr, self.central_region_id, self.manifest_path
592 )
593 }
594}
595
596#[derive(Debug, Clone, Serialize, Deserialize, Display, PartialEq)]
597pub enum Instruction {
598 #[serde(deserialize_with = "single_or_multiple_from", alias = "OpenRegion")]
600 OpenRegions(Vec<OpenRegion>),
601 #[serde(deserialize_with = "single_or_multiple_from", alias = "CloseRegion")]
603 CloseRegions(Vec<RegionIdent>),
604 #[serde(deserialize_with = "single_or_multiple_from", alias = "UpgradeRegion")]
606 UpgradeRegions(Vec<UpgradeRegion>),
607 #[serde(
608 deserialize_with = "single_or_multiple_from",
609 alias = "DowngradeRegion"
610 )]
611 DowngradeRegions(Vec<DowngradeRegion>),
613 InvalidateCaches(Vec<CacheIdent>),
615 FlushRegions(FlushRegions),
617 GetFileRefs(GetFileRefs),
619 GcRegions(GcRegions),
621 Suspend,
623 EnterStagingRegions(Vec<EnterStagingRegion>),
625 SyncRegions(Vec<SyncRegion>),
627 RemapManifest(RemapManifest),
629
630 ApplyStagingManifests(Vec<ApplyStagingManifest>),
632}
633
634impl Instruction {
635 pub fn into_open_regions(self) -> Option<Vec<OpenRegion>> {
637 match self {
638 Self::OpenRegions(open_regions) => Some(open_regions),
639 _ => None,
640 }
641 }
642
643 pub fn into_close_regions(self) -> Option<Vec<RegionIdent>> {
645 match self {
646 Self::CloseRegions(close_regions) => Some(close_regions),
647 _ => None,
648 }
649 }
650
651 pub fn into_flush_regions(self) -> Option<FlushRegions> {
653 match self {
654 Self::FlushRegions(flush_regions) => Some(flush_regions),
655 _ => None,
656 }
657 }
658
659 pub fn into_downgrade_regions(self) -> Option<Vec<DowngradeRegion>> {
661 match self {
662 Self::DowngradeRegions(downgrade_region) => Some(downgrade_region),
663 _ => None,
664 }
665 }
666
667 pub fn into_upgrade_regions(self) -> Option<Vec<UpgradeRegion>> {
669 match self {
670 Self::UpgradeRegions(upgrade_region) => Some(upgrade_region),
671 _ => None,
672 }
673 }
674
675 pub fn into_get_file_refs(self) -> Option<GetFileRefs> {
676 match self {
677 Self::GetFileRefs(get_file_refs) => Some(get_file_refs),
678 _ => None,
679 }
680 }
681
682 pub fn into_gc_regions(self) -> Option<GcRegions> {
683 match self {
684 Self::GcRegions(gc_regions) => Some(gc_regions),
685 _ => None,
686 }
687 }
688
689 pub fn into_enter_staging_regions(self) -> Option<Vec<EnterStagingRegion>> {
690 match self {
691 Self::EnterStagingRegions(enter_staging) => Some(enter_staging),
692 _ => None,
693 }
694 }
695
696 pub fn into_sync_regions(self) -> Option<Vec<SyncRegion>> {
697 match self {
698 Self::SyncRegions(sync_regions) => Some(sync_regions),
699 _ => None,
700 }
701 }
702}
703
704#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
706pub struct UpgradeRegionReply {
707 #[serde(default)]
710 pub region_id: RegionId,
711 pub ready: bool,
713 pub exists: bool,
715 pub error: Option<String>,
717}
718
719impl Display for UpgradeRegionReply {
720 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
721 write!(
722 f,
723 "(ready={}, exists={}, error={:?})",
724 self.ready, self.exists, self.error
725 )
726 }
727}
728
729#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
730pub struct DowngradeRegionsReply {
731 pub replies: Vec<DowngradeRegionReply>,
732}
733
734impl DowngradeRegionsReply {
735 pub fn new(replies: Vec<DowngradeRegionReply>) -> Self {
736 Self { replies }
737 }
738
739 pub fn single(reply: DowngradeRegionReply) -> Self {
740 Self::new(vec![reply])
741 }
742}
743
744#[derive(Deserialize)]
745#[serde(untagged)]
746enum DowngradeRegionsCompat {
747 Single(DowngradeRegionReply),
748 Multiple(DowngradeRegionsReply),
749}
750
751fn downgrade_regions_compat_from<'de, D>(deserializer: D) -> Result<DowngradeRegionsReply, D::Error>
752where
753 D: Deserializer<'de>,
754{
755 let helper = DowngradeRegionsCompat::deserialize(deserializer)?;
756 Ok(match helper {
757 DowngradeRegionsCompat::Single(x) => DowngradeRegionsReply::new(vec![x]),
758 DowngradeRegionsCompat::Multiple(reply) => reply,
759 })
760}
761
762#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
763pub struct UpgradeRegionsReply {
764 pub replies: Vec<UpgradeRegionReply>,
765}
766
767impl UpgradeRegionsReply {
768 pub fn new(replies: Vec<UpgradeRegionReply>) -> Self {
769 Self { replies }
770 }
771
772 pub fn single(reply: UpgradeRegionReply) -> Self {
773 Self::new(vec![reply])
774 }
775}
776
777#[derive(Deserialize)]
778#[serde(untagged)]
779enum UpgradeRegionsCompat {
780 Single(UpgradeRegionReply),
781 Multiple(UpgradeRegionsReply),
782}
783
784fn upgrade_regions_compat_from<'de, D>(deserializer: D) -> Result<UpgradeRegionsReply, D::Error>
785where
786 D: Deserializer<'de>,
787{
788 let helper = UpgradeRegionsCompat::deserialize(deserializer)?;
789 Ok(match helper {
790 UpgradeRegionsCompat::Single(x) => UpgradeRegionsReply::new(vec![x]),
791 UpgradeRegionsCompat::Multiple(reply) => reply,
792 })
793}
794
795#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
796pub struct EnterStagingRegionReply {
797 pub region_id: RegionId,
798 pub ready: bool,
800 pub exists: bool,
802 pub error: Option<String>,
804}
805
806#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
807pub struct EnterStagingRegionsReply {
808 pub replies: Vec<EnterStagingRegionReply>,
809}
810
811impl EnterStagingRegionsReply {
812 pub fn new(replies: Vec<EnterStagingRegionReply>) -> Self {
813 Self { replies }
814 }
815}
816
817#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
819pub struct SyncRegionReply {
820 pub region_id: RegionId,
822 pub ready: bool,
824 pub exists: bool,
826 pub error: Option<String>,
828}
829
830#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
832pub struct SyncRegionsReply {
833 pub replies: Vec<SyncRegionReply>,
834}
835
836impl SyncRegionsReply {
837 pub fn new(replies: Vec<SyncRegionReply>) -> Self {
838 Self { replies }
839 }
840}
841
842#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
843pub struct RemapManifestReply {
844 pub exists: bool,
846 pub manifest_paths: HashMap<RegionId, String>,
848 pub error: Option<String>,
850}
851
852impl Display for RemapManifestReply {
853 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
854 write!(
855 f,
856 "RemapManifestReply(manifest_paths={:?}, error={:?})",
857 self.manifest_paths, self.error
858 )
859 }
860}
861
862#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
863pub struct ApplyStagingManifestsReply {
864 pub replies: Vec<ApplyStagingManifestReply>,
865}
866
867impl ApplyStagingManifestsReply {
868 pub fn new(replies: Vec<ApplyStagingManifestReply>) -> Self {
869 Self { replies }
870 }
871}
872
873#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
874pub struct ApplyStagingManifestReply {
875 pub region_id: RegionId,
876 pub ready: bool,
878 pub exists: bool,
880 pub error: Option<String>,
882}
883
884#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
885#[serde(tag = "type", rename_all = "snake_case")]
886pub enum InstructionReply {
887 #[serde(alias = "open_region")]
888 OpenRegions(SimpleReply),
889 #[serde(alias = "close_region")]
890 CloseRegions(SimpleReply),
891 #[serde(
892 deserialize_with = "upgrade_regions_compat_from",
893 alias = "upgrade_region"
894 )]
895 UpgradeRegions(UpgradeRegionsReply),
896 #[serde(
897 alias = "downgrade_region",
898 deserialize_with = "downgrade_regions_compat_from"
899 )]
900 DowngradeRegions(DowngradeRegionsReply),
901 FlushRegions(FlushRegionReply),
902 GetFileRefs(GetFileRefsReply),
903 GcRegions(GcRegionsReply),
904 EnterStagingRegions(EnterStagingRegionsReply),
905 SyncRegions(SyncRegionsReply),
906 RemapManifest(RemapManifestReply),
907 ApplyStagingManifests(ApplyStagingManifestsReply),
908}
909
910impl Display for InstructionReply {
911 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
912 match self {
913 Self::OpenRegions(reply) => write!(f, "InstructionReply::OpenRegions({})", reply),
914 Self::CloseRegions(reply) => write!(f, "InstructionReply::CloseRegions({})", reply),
915 Self::UpgradeRegions(reply) => {
916 write!(f, "InstructionReply::UpgradeRegions({:?})", reply.replies)
917 }
918 Self::DowngradeRegions(reply) => {
919 write!(f, "InstructionReply::DowngradeRegions({:?})", reply.replies)
920 }
921 Self::FlushRegions(reply) => write!(f, "InstructionReply::FlushRegions({})", reply),
922 Self::GetFileRefs(reply) => write!(f, "InstructionReply::GetFileRefs({})", reply),
923 Self::GcRegions(reply) => write!(f, "InstructionReply::GcRegion({})", reply),
924 Self::EnterStagingRegions(reply) => {
925 write!(
926 f,
927 "InstructionReply::EnterStagingRegions({:?})",
928 reply.replies
929 )
930 }
931 Self::SyncRegions(reply) => {
932 write!(f, "InstructionReply::SyncRegions({:?})", reply.replies)
933 }
934 Self::RemapManifest(reply) => write!(f, "InstructionReply::RemapManifest({})", reply),
935 Self::ApplyStagingManifests(reply) => write!(
936 f,
937 "InstructionReply::ApplyStagingManifests({:?})",
938 reply.replies
939 ),
940 }
941 }
942}
943
944#[cfg(any(test, feature = "testing"))]
945impl InstructionReply {
946 pub fn expect_close_regions_reply(self) -> SimpleReply {
947 match self {
948 Self::CloseRegions(reply) => reply,
949 _ => panic!("Expected CloseRegions reply"),
950 }
951 }
952
953 pub fn expect_open_regions_reply(self) -> SimpleReply {
954 match self {
955 Self::OpenRegions(reply) => reply,
956 _ => panic!("Expected OpenRegions reply"),
957 }
958 }
959
960 pub fn expect_upgrade_regions_reply(self) -> Vec<UpgradeRegionReply> {
961 match self {
962 Self::UpgradeRegions(reply) => reply.replies,
963 _ => panic!("Expected UpgradeRegion reply"),
964 }
965 }
966
967 pub fn expect_downgrade_regions_reply(self) -> Vec<DowngradeRegionReply> {
968 match self {
969 Self::DowngradeRegions(reply) => reply.replies,
970 _ => panic!("Expected DowngradeRegion reply"),
971 }
972 }
973
974 pub fn expect_flush_regions_reply(self) -> FlushRegionReply {
975 match self {
976 Self::FlushRegions(reply) => reply,
977 _ => panic!("Expected FlushRegions reply"),
978 }
979 }
980
981 pub fn expect_enter_staging_regions_reply(self) -> Vec<EnterStagingRegionReply> {
982 match self {
983 Self::EnterStagingRegions(reply) => reply.replies,
984 _ => panic!("Expected EnterStagingRegion reply"),
985 }
986 }
987
988 pub fn expect_sync_regions_reply(self) -> Vec<SyncRegionReply> {
989 match self {
990 Self::SyncRegions(reply) => reply.replies,
991 _ => panic!("Expected SyncRegion reply"),
992 }
993 }
994
995 pub fn expect_remap_manifest_reply(self) -> RemapManifestReply {
996 match self {
997 Self::RemapManifest(reply) => reply,
998 _ => panic!("Expected RemapManifest reply"),
999 }
1000 }
1001
1002 pub fn expect_apply_staging_manifests_reply(self) -> Vec<ApplyStagingManifestReply> {
1003 match self {
1004 Self::ApplyStagingManifests(reply) => reply.replies,
1005 _ => panic!("Expected ApplyStagingManifest reply"),
1006 }
1007 }
1008}
1009
1010#[cfg(test)]
1011mod tests {
1012 use std::collections::HashSet;
1013
1014 use store_api::storage::{FileId, FileRef};
1015
1016 use super::*;
1017
1018 #[test]
1019 fn test_serialize_instruction() {
1020 let open_region = Instruction::OpenRegions(vec![OpenRegion::new(
1021 RegionIdent {
1022 datanode_id: 2,
1023 table_id: 1024,
1024 region_number: 1,
1025 engine: "mito2".to_string(),
1026 },
1027 "test/foo",
1028 HashMap::new(),
1029 HashMap::new(),
1030 false,
1031 )]);
1032
1033 let serialized = serde_json::to_string(&open_region).unwrap();
1034 assert_eq!(
1035 r#"{"OpenRegions":[{"region_ident":{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"},"region_storage_path":"test/foo","region_options":{},"region_wal_options":{},"skip_wal_replay":false}]}"#,
1036 serialized
1037 );
1038
1039 let close_region = Instruction::CloseRegions(vec![RegionIdent {
1040 datanode_id: 2,
1041 table_id: 1024,
1042 region_number: 1,
1043 engine: "mito2".to_string(),
1044 }]);
1045
1046 let serialized = serde_json::to_string(&close_region).unwrap();
1047 assert_eq!(
1048 r#"{"CloseRegions":[{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"}]}"#,
1049 serialized
1050 );
1051
1052 let upgrade_region = Instruction::UpgradeRegions(vec![UpgradeRegion {
1053 region_id: RegionId::new(1024, 1),
1054 last_entry_id: None,
1055 metadata_last_entry_id: None,
1056 replay_timeout: Duration::from_millis(1000),
1057 location_id: None,
1058 replay_entry_id: None,
1059 metadata_replay_entry_id: None,
1060 }]);
1061
1062 let serialized = serde_json::to_string(&upgrade_region).unwrap();
1063 assert_eq!(
1064 r#"{"UpgradeRegions":[{"region_id":4398046511105,"last_entry_id":null,"metadata_last_entry_id":null,"replay_timeout":"1s","location_id":null}]}"#,
1065 serialized
1066 );
1067 }
1068
1069 #[test]
1070 fn test_serialize_instruction_reply() {
1071 let downgrade_region_reply = InstructionReply::DowngradeRegions(
1072 DowngradeRegionsReply::single(DowngradeRegionReply {
1073 region_id: RegionId::new(1024, 1),
1074 last_entry_id: None,
1075 metadata_last_entry_id: None,
1076 exists: true,
1077 error: None,
1078 }),
1079 );
1080
1081 let serialized = serde_json::to_string(&downgrade_region_reply).unwrap();
1082 assert_eq!(
1083 r#"{"type":"downgrade_regions","replies":[{"region_id":4398046511105,"last_entry_id":null,"metadata_last_entry_id":null,"exists":true,"error":null}]}"#,
1084 serialized
1085 );
1086
1087 let upgrade_region_reply =
1088 InstructionReply::UpgradeRegions(UpgradeRegionsReply::single(UpgradeRegionReply {
1089 region_id: RegionId::new(1024, 1),
1090 ready: true,
1091 exists: true,
1092 error: None,
1093 }));
1094 let serialized = serde_json::to_string(&upgrade_region_reply).unwrap();
1095 assert_eq!(
1096 r#"{"type":"upgrade_regions","replies":[{"region_id":4398046511105,"ready":true,"exists":true,"error":null}]}"#,
1097 serialized
1098 );
1099 }
1100
1101 #[test]
1102 fn test_deserialize_instruction() {
1103 let open_region_instruction = r#"{"OpenRegion":{"region_ident":{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"},"region_storage_path":"test/foo","region_options":{},"region_wal_options":{},"skip_wal_replay":false}}"#;
1105 let open_region_instruction: Instruction =
1106 serde_json::from_str(open_region_instruction).unwrap();
1107 let open_region = Instruction::OpenRegions(vec![OpenRegion::new(
1108 RegionIdent {
1109 datanode_id: 2,
1110 table_id: 1024,
1111 region_number: 1,
1112 engine: "mito2".to_string(),
1113 },
1114 "test/foo",
1115 HashMap::new(),
1116 HashMap::new(),
1117 false,
1118 )]);
1119 assert_eq!(open_region_instruction, open_region);
1120
1121 let close_region_instruction = r#"{"CloseRegion":{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"}}"#;
1123 let close_region_instruction: Instruction =
1124 serde_json::from_str(close_region_instruction).unwrap();
1125 let close_region = Instruction::CloseRegions(vec![RegionIdent {
1126 datanode_id: 2,
1127 table_id: 1024,
1128 region_number: 1,
1129 engine: "mito2".to_string(),
1130 }]);
1131 assert_eq!(close_region_instruction, close_region);
1132
1133 let downgrade_region_instruction = r#"{"DowngradeRegions":{"region_id":4398046511105,"flush_timeout":{"secs":1,"nanos":0}}}"#;
1135 let downgrade_region_instruction: Instruction =
1136 serde_json::from_str(downgrade_region_instruction).unwrap();
1137 let downgrade_region = Instruction::DowngradeRegions(vec![DowngradeRegion {
1138 region_id: RegionId::new(1024, 1),
1139 flush_timeout: Some(Duration::from_millis(1000)),
1140 }]);
1141 assert_eq!(downgrade_region_instruction, downgrade_region);
1142
1143 let upgrade_region_instruction = r#"{"UpgradeRegion":{"region_id":4398046511105,"last_entry_id":null,"metadata_last_entry_id":null,"replay_timeout":"1s","location_id":null,"replay_entry_id":null,"metadata_replay_entry_id":null}}"#;
1145 let upgrade_region_instruction: Instruction =
1146 serde_json::from_str(upgrade_region_instruction).unwrap();
1147 let upgrade_region = Instruction::UpgradeRegions(vec![UpgradeRegion {
1148 region_id: RegionId::new(1024, 1),
1149 last_entry_id: None,
1150 metadata_last_entry_id: None,
1151 replay_timeout: Duration::from_millis(1000),
1152 location_id: None,
1153 replay_entry_id: None,
1154 metadata_replay_entry_id: None,
1155 }]);
1156 assert_eq!(upgrade_region_instruction, upgrade_region);
1157 }
1158
1159 #[test]
1160 fn test_deserialize_instruction_reply() {
1161 let close_region_instruction_reply =
1163 r#"{"result":true,"error":null,"type":"close_region"}"#;
1164 let close_region_instruction_reply: InstructionReply =
1165 serde_json::from_str(close_region_instruction_reply).unwrap();
1166 let close_region_reply = InstructionReply::CloseRegions(SimpleReply {
1167 result: true,
1168 error: None,
1169 });
1170 assert_eq!(close_region_instruction_reply, close_region_reply);
1171
1172 let open_region_instruction_reply = r#"{"result":true,"error":null,"type":"open_region"}"#;
1174 let open_region_instruction_reply: InstructionReply =
1175 serde_json::from_str(open_region_instruction_reply).unwrap();
1176 let open_region_reply = InstructionReply::OpenRegions(SimpleReply {
1177 result: true,
1178 error: None,
1179 });
1180 assert_eq!(open_region_instruction_reply, open_region_reply);
1181
1182 let downgrade_region_instruction_reply = r#"{"region_id":4398046511105,"last_entry_id":null,"metadata_last_entry_id":null,"exists":true,"error":null,"type":"downgrade_region"}"#;
1184 let downgrade_region_instruction_reply: InstructionReply =
1185 serde_json::from_str(downgrade_region_instruction_reply).unwrap();
1186 let downgrade_region_reply = InstructionReply::DowngradeRegions(
1187 DowngradeRegionsReply::single(DowngradeRegionReply {
1188 region_id: RegionId::new(1024, 1),
1189 last_entry_id: None,
1190 metadata_last_entry_id: None,
1191 exists: true,
1192 error: None,
1193 }),
1194 );
1195 assert_eq!(downgrade_region_instruction_reply, downgrade_region_reply);
1196
1197 let upgrade_region_instruction_reply = r#"{"region_id":4398046511105,"ready":true,"exists":true,"error":null,"type":"upgrade_region"}"#;
1199 let upgrade_region_instruction_reply: InstructionReply =
1200 serde_json::from_str(upgrade_region_instruction_reply).unwrap();
1201 let upgrade_region_reply =
1202 InstructionReply::UpgradeRegions(UpgradeRegionsReply::single(UpgradeRegionReply {
1203 region_id: RegionId::new(1024, 1),
1204 ready: true,
1205 exists: true,
1206 error: None,
1207 }));
1208 assert_eq!(upgrade_region_instruction_reply, upgrade_region_reply);
1209 }
1210
1211 #[derive(Debug, Clone, Serialize, Deserialize)]
1212 struct LegacyOpenRegion {
1213 region_ident: RegionIdent,
1214 region_storage_path: String,
1215 region_options: HashMap<String, String>,
1216 }
1217
1218 #[test]
1219 fn test_compatible_serialize_open_region() {
1220 let region_ident = RegionIdent {
1221 datanode_id: 2,
1222 table_id: 1024,
1223 region_number: 1,
1224 engine: "mito2".to_string(),
1225 };
1226 let region_storage_path = "test/foo".to_string();
1227 let region_options = HashMap::from([
1228 ("a".to_string(), "aa".to_string()),
1229 ("b".to_string(), "bb".to_string()),
1230 ]);
1231
1232 let legacy_open_region = LegacyOpenRegion {
1234 region_ident: region_ident.clone(),
1235 region_storage_path: region_storage_path.clone(),
1236 region_options: region_options.clone(),
1237 };
1238 let serialized = serde_json::to_string(&legacy_open_region).unwrap();
1239
1240 let deserialized = serde_json::from_str(&serialized).unwrap();
1242 let expected = OpenRegion {
1243 region_ident,
1244 region_storage_path,
1245 region_options,
1246 region_wal_options: HashMap::new(),
1247 skip_wal_replay: false,
1248 };
1249 assert_eq!(expected, deserialized);
1250 }
1251
1252 #[test]
1253 fn test_flush_regions_creation() {
1254 let region_id = RegionId::new(1024, 1);
1255
1256 let single_sync = FlushRegions::sync_single(region_id);
1258 assert_eq!(single_sync.region_ids, vec![region_id]);
1259 assert_eq!(single_sync.strategy, FlushStrategy::Sync);
1260 assert!(!single_sync.is_hint());
1261 assert!(single_sync.is_sync());
1262 assert_eq!(single_sync.error_strategy, FlushErrorStrategy::FailFast);
1263 assert!(single_sync.is_single_region());
1264 assert_eq!(single_sync.single_region_id(), Some(region_id));
1265
1266 let region_ids = vec![RegionId::new(1024, 1), RegionId::new(1024, 2)];
1268 let batch_async = FlushRegions::async_batch(region_ids.clone());
1269 assert_eq!(batch_async.region_ids, region_ids);
1270 assert_eq!(batch_async.strategy, FlushStrategy::Async);
1271 assert!(batch_async.is_hint());
1272 assert!(!batch_async.is_sync());
1273 assert_eq!(batch_async.error_strategy, FlushErrorStrategy::TryAll);
1274 assert!(!batch_async.is_single_region());
1275 assert_eq!(batch_async.single_region_id(), None);
1276
1277 let batch_sync = FlushRegions::sync_batch(region_ids.clone(), FlushErrorStrategy::FailFast);
1279 assert_eq!(batch_sync.region_ids, region_ids);
1280 assert_eq!(batch_sync.strategy, FlushStrategy::Sync);
1281 assert!(!batch_sync.is_hint());
1282 assert!(batch_sync.is_sync());
1283 assert_eq!(batch_sync.error_strategy, FlushErrorStrategy::FailFast);
1284 }
1285
1286 #[test]
1287 fn test_flush_regions_conversion() {
1288 let region_id = RegionId::new(1024, 1);
1289
1290 let from_region_id: FlushRegions = region_id.into();
1291 assert_eq!(from_region_id.region_ids, vec![region_id]);
1292 assert_eq!(from_region_id.strategy, FlushStrategy::Sync);
1293 assert!(!from_region_id.is_hint());
1294 assert!(from_region_id.is_sync());
1295
1296 let flush_regions = FlushRegions {
1298 region_ids: vec![region_id],
1299 strategy: FlushStrategy::Async,
1300 error_strategy: FlushErrorStrategy::TryAll,
1301 };
1302 assert_eq!(flush_regions.region_ids, vec![region_id]);
1303 assert_eq!(flush_regions.strategy, FlushStrategy::Async);
1304 assert!(flush_regions.is_hint());
1305 assert!(!flush_regions.is_sync());
1306 }
1307
1308 #[test]
1309 fn test_flush_region_reply() {
1310 let region_id = RegionId::new(1024, 1);
1311
1312 let success_reply = FlushRegionReply::success_single(region_id);
1314 assert!(success_reply.overall_success);
1315 assert_eq!(success_reply.results.len(), 1);
1316 assert_eq!(success_reply.results[0].0, region_id);
1317 assert!(success_reply.results[0].1.is_ok());
1318
1319 let error_reply = FlushRegionReply::error_single(region_id, "test error".to_string());
1321 assert!(!error_reply.overall_success);
1322 assert_eq!(error_reply.results.len(), 1);
1323 assert_eq!(error_reply.results[0].0, region_id);
1324 assert!(error_reply.results[0].1.is_err());
1325
1326 let region_id2 = RegionId::new(1024, 2);
1328 let results = vec![
1329 (region_id, Ok(())),
1330 (region_id2, Err("flush failed".to_string())),
1331 ];
1332 let batch_reply = FlushRegionReply::from_results(results);
1333 assert!(!batch_reply.overall_success);
1334 assert_eq!(batch_reply.results.len(), 2);
1335
1336 let simple_reply = batch_reply.to_simple_reply();
1338 assert!(!simple_reply.result);
1339 assert!(simple_reply.error.is_some());
1340 assert!(simple_reply.error.unwrap().contains("flush failed"));
1341 }
1342
1343 #[test]
1344 fn test_serialize_flush_regions_instruction() {
1345 let region_id = RegionId::new(1024, 1);
1346 let flush_regions = FlushRegions::sync_single(region_id);
1347 let instruction = Instruction::FlushRegions(flush_regions.clone());
1348
1349 let serialized = serde_json::to_string(&instruction).unwrap();
1350 let deserialized: Instruction = serde_json::from_str(&serialized).unwrap();
1351
1352 match deserialized {
1353 Instruction::FlushRegions(fr) => {
1354 assert_eq!(fr.region_ids, vec![region_id]);
1355 assert_eq!(fr.strategy, FlushStrategy::Sync);
1356 assert_eq!(fr.error_strategy, FlushErrorStrategy::FailFast);
1357 }
1358 _ => panic!("Expected FlushRegions instruction"),
1359 }
1360 }
1361
1362 #[test]
1363 fn test_serialize_flush_regions_batch_instruction() {
1364 let region_ids = vec![RegionId::new(1024, 1), RegionId::new(1024, 2)];
1365 let flush_regions =
1366 FlushRegions::sync_batch(region_ids.clone(), FlushErrorStrategy::TryAll);
1367 let instruction = Instruction::FlushRegions(flush_regions);
1368
1369 let serialized = serde_json::to_string(&instruction).unwrap();
1370 let deserialized: Instruction = serde_json::from_str(&serialized).unwrap();
1371
1372 match deserialized {
1373 Instruction::FlushRegions(fr) => {
1374 assert_eq!(fr.region_ids, region_ids);
1375 assert_eq!(fr.strategy, FlushStrategy::Sync);
1376 assert!(!fr.is_hint());
1377 assert!(fr.is_sync());
1378 assert_eq!(fr.error_strategy, FlushErrorStrategy::TryAll);
1379 }
1380 _ => panic!("Expected FlushRegions instruction"),
1381 }
1382 }
1383
1384 #[test]
1385 fn test_serialize_get_file_refs_instruction_reply() {
1386 let mut manifest = FileRefsManifest::default();
1387 let r0 = RegionId::new(1024, 1);
1388 let r1 = RegionId::new(1024, 2);
1389 manifest.file_refs.insert(
1390 r0,
1391 HashSet::from([FileRef::new(r0, FileId::random(), None)]),
1392 );
1393 manifest.file_refs.insert(
1394 r1,
1395 HashSet::from([FileRef::new(r1, FileId::random(), None)]),
1396 );
1397 manifest.manifest_version.insert(r0, 10);
1398 manifest.manifest_version.insert(r1, 20);
1399
1400 let instruction_reply = InstructionReply::GetFileRefs(GetFileRefsReply {
1401 file_refs_manifest: manifest,
1402 success: true,
1403 error: None,
1404 });
1405
1406 let serialized = serde_json::to_string(&instruction_reply).unwrap();
1407 let deserialized = serde_json::from_str(&serialized).unwrap();
1408
1409 assert_eq!(instruction_reply, deserialized);
1410 }
1411}