1use std::collections::HashMap;
16use std::fmt::{Display, Formatter};
17use std::time::Duration;
18
19use serde::{Deserialize, Deserializer, Serialize};
20use store_api::storage::{FileRefsManifest, GcReport, RegionId, RegionNumber};
21use strum::Display;
22use table::metadata::TableId;
23use table::table_name::TableName;
24
25use crate::flow_name::FlowName;
26use crate::key::schema_name::SchemaName;
27use crate::key::{FlowId, FlowPartitionId};
28use crate::peer::Peer;
29use crate::{DatanodeId, FlownodeId};
30
31#[derive(Eq, Hash, PartialEq, Clone, Debug, Serialize, Deserialize)]
32pub struct RegionIdent {
33 pub datanode_id: DatanodeId,
34 pub table_id: TableId,
35 pub region_number: RegionNumber,
36 pub engine: String,
37}
38
39impl RegionIdent {
40 pub fn get_region_id(&self) -> RegionId {
41 RegionId::new(self.table_id, self.region_number)
42 }
43}
44
45impl Display for RegionIdent {
46 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
47 write!(
48 f,
49 "RegionIdent(datanode_id='{}', table_id={}, region_number={}, engine = {})",
50 self.datanode_id, self.table_id, self.region_number, self.engine
51 )
52 }
53}
54
55#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
57pub struct DowngradeRegionReply {
58 #[serde(default)]
61 pub region_id: RegionId,
62 pub last_entry_id: Option<u64>,
64 pub metadata_last_entry_id: Option<u64>,
66 pub exists: bool,
68 pub error: Option<String>,
70}
71
72impl Display for DowngradeRegionReply {
73 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
74 write!(
75 f,
76 "(last_entry_id={:?}, exists={}, error={:?})",
77 self.last_entry_id, self.exists, self.error
78 )
79 }
80}
81
82#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
83pub struct SimpleReply {
84 pub result: bool,
85 pub error: Option<String>,
86}
87
88#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
90pub struct FlushRegionReply {
91 pub results: Vec<(RegionId, Result<(), String>)>,
95 pub overall_success: bool,
97}
98
99impl FlushRegionReply {
100 pub fn success_single(region_id: RegionId) -> Self {
102 Self {
103 results: vec![(region_id, Ok(()))],
104 overall_success: true,
105 }
106 }
107
108 pub fn error_single(region_id: RegionId, error: String) -> Self {
110 Self {
111 results: vec![(region_id, Err(error))],
112 overall_success: false,
113 }
114 }
115
116 pub fn from_results(results: Vec<(RegionId, Result<(), String>)>) -> Self {
118 let overall_success = results.iter().all(|(_, result)| result.is_ok());
119 Self {
120 results,
121 overall_success,
122 }
123 }
124
125 pub fn to_simple_reply(&self) -> SimpleReply {
127 if self.overall_success {
128 SimpleReply {
129 result: true,
130 error: None,
131 }
132 } else {
133 let errors: Vec<String> = self
134 .results
135 .iter()
136 .filter_map(|(region_id, result)| {
137 result
138 .as_ref()
139 .err()
140 .map(|err| format!("{}: {}", region_id, err))
141 })
142 .collect();
143 SimpleReply {
144 result: false,
145 error: Some(errors.join("; ")),
146 }
147 }
148 }
149}
150
151impl Display for SimpleReply {
152 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
153 write!(f, "(result={}, error={:?})", self.result, self.error)
154 }
155}
156
157impl Display for FlushRegionReply {
158 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
159 let results_str = self
160 .results
161 .iter()
162 .map(|(region_id, result)| match result {
163 Ok(()) => format!("{}:OK", region_id),
164 Err(err) => format!("{}:ERR({})", region_id, err),
165 })
166 .collect::<Vec<_>>()
167 .join(", ");
168 write!(
169 f,
170 "(overall_success={}, results=[{}])",
171 self.overall_success, results_str
172 )
173 }
174}
175
176impl Display for OpenRegion {
177 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
178 write!(
179 f,
180 "OpenRegion(region_ident={}, region_storage_path={})",
181 self.region_ident, self.region_storage_path
182 )
183 }
184}
185
186#[serde_with::serde_as]
187#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
188pub struct OpenRegion {
189 pub region_ident: RegionIdent,
190 pub region_storage_path: String,
191 pub region_options: HashMap<String, String>,
192 #[serde(default)]
193 #[serde_as(as = "HashMap<serde_with::DisplayFromStr, _>")]
194 pub region_wal_options: HashMap<RegionNumber, String>,
195 #[serde(default)]
196 pub skip_wal_replay: bool,
197}
198
199impl OpenRegion {
200 pub fn new(
201 region_ident: RegionIdent,
202 path: &str,
203 region_options: HashMap<String, String>,
204 region_wal_options: HashMap<RegionNumber, String>,
205 skip_wal_replay: bool,
206 ) -> Self {
207 Self {
208 region_ident,
209 region_storage_path: path.to_string(),
210 region_options,
211 region_wal_options,
212 skip_wal_replay,
213 }
214 }
215}
216
217#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
219pub struct DowngradeRegion {
220 pub region_id: RegionId,
222 #[serde(default)]
226 pub flush_timeout: Option<Duration>,
227}
228
229impl Display for DowngradeRegion {
230 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
231 write!(
232 f,
233 "DowngradeRegion(region_id={}, flush_timeout={:?})",
234 self.region_id, self.flush_timeout,
235 )
236 }
237}
238
239#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
241pub struct UpgradeRegion {
242 pub region_id: RegionId,
244 pub last_entry_id: Option<u64>,
246 pub metadata_last_entry_id: Option<u64>,
248 #[serde(with = "humantime_serde")]
253 pub replay_timeout: Duration,
254 #[serde(default)]
256 pub location_id: Option<u64>,
257 #[serde(default, skip_serializing_if = "Option::is_none")]
258 pub replay_entry_id: Option<u64>,
259 #[serde(default, skip_serializing_if = "Option::is_none")]
260 pub metadata_replay_entry_id: Option<u64>,
261}
262
263impl UpgradeRegion {
264 pub fn with_replay_entry_id(mut self, replay_entry_id: Option<u64>) -> Self {
266 self.replay_entry_id = replay_entry_id;
267 self
268 }
269
270 pub fn with_metadata_replay_entry_id(mut self, metadata_replay_entry_id: Option<u64>) -> Self {
272 self.metadata_replay_entry_id = metadata_replay_entry_id;
273 self
274 }
275}
276
277#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
278pub enum CacheIdent {
280 FlowId(FlowId),
281 FlowNodeAddressChange(u64),
283 FlowName(FlowName),
284 TableId(TableId),
285 TableName(TableName),
286 SchemaName(SchemaName),
287 CreateFlow(CreateFlow),
288 DropFlow(DropFlow),
289}
290
291#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
292pub struct CreateFlow {
293 pub flow_id: FlowId,
295 pub source_table_ids: Vec<TableId>,
296 pub partition_to_peer_mapping: Vec<(FlowPartitionId, Peer)>,
298}
299
300#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
301pub struct DropFlow {
302 pub flow_id: FlowId,
303 pub source_table_ids: Vec<TableId>,
304 pub flow_part2node_id: Vec<(FlowPartitionId, FlownodeId)>,
306}
307
308#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
310pub enum FlushStrategy {
311 #[default]
313 Sync,
314 Async,
316}
317
318#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
320pub enum FlushErrorStrategy {
321 #[default]
323 FailFast,
324 TryAll,
326}
327
328#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
331pub struct FlushRegions {
332 pub region_ids: Vec<RegionId>,
334 #[serde(default)]
336 pub strategy: FlushStrategy,
337 #[serde(default)]
339 pub error_strategy: FlushErrorStrategy,
340}
341
342impl Display for FlushRegions {
343 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
344 write!(
345 f,
346 "FlushRegions(region_ids={:?}, strategy={:?}, error_strategy={:?})",
347 self.region_ids, self.strategy, self.error_strategy
348 )
349 }
350}
351
352impl FlushRegions {
353 pub fn sync_single(region_id: RegionId) -> Self {
355 Self {
356 region_ids: vec![region_id],
357 strategy: FlushStrategy::Sync,
358 error_strategy: FlushErrorStrategy::FailFast,
359 }
360 }
361
362 pub fn async_batch(region_ids: Vec<RegionId>) -> Self {
364 Self {
365 region_ids,
366 strategy: FlushStrategy::Async,
367 error_strategy: FlushErrorStrategy::TryAll,
368 }
369 }
370
371 pub fn sync_batch(region_ids: Vec<RegionId>, error_strategy: FlushErrorStrategy) -> Self {
373 Self {
374 region_ids,
375 strategy: FlushStrategy::Sync,
376 error_strategy,
377 }
378 }
379
380 pub fn is_single_region(&self) -> bool {
382 self.region_ids.len() == 1
383 }
384
385 pub fn single_region_id(&self) -> Option<RegionId> {
387 if self.is_single_region() {
388 self.region_ids.first().copied()
389 } else {
390 None
391 }
392 }
393
394 pub fn is_hint(&self) -> bool {
396 matches!(self.strategy, FlushStrategy::Async)
397 }
398
399 pub fn is_sync(&self) -> bool {
401 matches!(self.strategy, FlushStrategy::Sync)
402 }
403}
404
405impl From<RegionId> for FlushRegions {
406 fn from(region_id: RegionId) -> Self {
407 Self::sync_single(region_id)
408 }
409}
410
411#[derive(Debug, Deserialize)]
412#[serde(untagged)]
413enum SingleOrMultiple<T> {
414 Single(T),
415 Multiple(Vec<T>),
416}
417
418fn single_or_multiple_from<'de, D, T>(deserializer: D) -> Result<Vec<T>, D::Error>
419where
420 D: Deserializer<'de>,
421 T: Deserialize<'de>,
422{
423 let helper = SingleOrMultiple::<T>::deserialize(deserializer)?;
424 Ok(match helper {
425 SingleOrMultiple::Single(x) => vec![x],
426 SingleOrMultiple::Multiple(xs) => xs,
427 })
428}
429
430#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
432pub struct GetFileRefs {
433 pub query_regions: Vec<RegionId>,
435 pub related_regions: HashMap<RegionId, Vec<RegionId>>,
440}
441
442impl Display for GetFileRefs {
443 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
444 write!(f, "GetFileRefs(region_ids={:?})", self.query_regions)
445 }
446}
447
448#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
450pub struct GcRegions {
451 pub regions: Vec<RegionId>,
453 pub file_refs_manifest: FileRefsManifest,
455 pub full_file_listing: bool,
457}
458
459impl Display for GcRegions {
460 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
461 write!(
462 f,
463 "GcRegion(regions={:?}, file_refs_count={}, full_file_listing={})",
464 self.regions,
465 self.file_refs_manifest.file_refs.len(),
466 self.full_file_listing
467 )
468 }
469}
470
471#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
473pub struct GetFileRefsReply {
474 pub file_refs_manifest: FileRefsManifest,
476 pub success: bool,
478 pub error: Option<String>,
480}
481
482impl Display for GetFileRefsReply {
483 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
484 write!(
485 f,
486 "GetFileRefsReply(success={}, file_refs_count={}, error={:?})",
487 self.success,
488 self.file_refs_manifest.file_refs.len(),
489 self.error
490 )
491 }
492}
493
494#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
496pub struct GcRegionsReply {
497 pub result: Result<GcReport, String>,
498}
499
500impl Display for GcRegionsReply {
501 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
502 write!(
503 f,
504 "GcReply(result={})",
505 match &self.result {
506 Ok(report) => format!(
507 "GcReport(deleted_files_count={}, need_retry_regions_count={})",
508 report.deleted_files.len(),
509 report.need_retry_regions.len()
510 ),
511 Err(err) => format!("Err({})", err),
512 }
513 )
514 }
515}
516
517#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
518pub struct EnterStagingRegion {
519 pub region_id: RegionId,
520 pub partition_expr: String,
521}
522
523impl Display for EnterStagingRegion {
524 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
525 write!(
526 f,
527 "EnterStagingRegion(region_id={}, partition_expr={})",
528 self.region_id, self.partition_expr
529 )
530 }
531}
532
533#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
534pub struct RemapManifest {
535 pub region_id: RegionId,
536 pub input_regions: Vec<RegionId>,
538 pub region_mapping: HashMap<RegionId, Vec<RegionId>>,
540 pub new_partition_exprs: HashMap<RegionId, String>,
542}
543
544impl Display for RemapManifest {
545 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
546 write!(
547 f,
548 "RemapManifest(region_id={}, input_regions={:?}, region_mapping={:?}, new_partition_exprs={:?})",
549 self.region_id, self.input_regions, self.region_mapping, self.new_partition_exprs
550 )
551 }
552}
553
554#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
555pub struct ApplyStagingManifest {
556 pub region_id: RegionId,
558 pub partition_expr: String,
560 pub central_region_id: RegionId,
562 pub manifest_path: String,
564}
565
566impl Display for ApplyStagingManifest {
567 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
568 write!(
569 f,
570 "ApplyStagingManifest(region_id={}, partition_expr={}, central_region_id={}, manifest_path={})",
571 self.region_id, self.partition_expr, self.central_region_id, self.manifest_path
572 )
573 }
574}
575
576#[derive(Debug, Clone, Serialize, Deserialize, Display, PartialEq)]
577pub enum Instruction {
578 #[serde(deserialize_with = "single_or_multiple_from", alias = "OpenRegion")]
580 OpenRegions(Vec<OpenRegion>),
581 #[serde(deserialize_with = "single_or_multiple_from", alias = "CloseRegion")]
583 CloseRegions(Vec<RegionIdent>),
584 #[serde(deserialize_with = "single_or_multiple_from", alias = "UpgradeRegion")]
586 UpgradeRegions(Vec<UpgradeRegion>),
587 #[serde(
588 deserialize_with = "single_or_multiple_from",
589 alias = "DowngradeRegion"
590 )]
591 DowngradeRegions(Vec<DowngradeRegion>),
593 InvalidateCaches(Vec<CacheIdent>),
595 FlushRegions(FlushRegions),
597 GetFileRefs(GetFileRefs),
599 GcRegions(GcRegions),
601 Suspend,
603 EnterStagingRegions(Vec<EnterStagingRegion>),
605 RemapManifest(RemapManifest),
607 ApplyStagingManifests(Vec<ApplyStagingManifest>),
609}
610
611impl Instruction {
612 pub fn into_open_regions(self) -> Option<Vec<OpenRegion>> {
614 match self {
615 Self::OpenRegions(open_regions) => Some(open_regions),
616 _ => None,
617 }
618 }
619
620 pub fn into_close_regions(self) -> Option<Vec<RegionIdent>> {
622 match self {
623 Self::CloseRegions(close_regions) => Some(close_regions),
624 _ => None,
625 }
626 }
627
628 pub fn into_flush_regions(self) -> Option<FlushRegions> {
630 match self {
631 Self::FlushRegions(flush_regions) => Some(flush_regions),
632 _ => None,
633 }
634 }
635
636 pub fn into_downgrade_regions(self) -> Option<Vec<DowngradeRegion>> {
638 match self {
639 Self::DowngradeRegions(downgrade_region) => Some(downgrade_region),
640 _ => None,
641 }
642 }
643
644 pub fn into_upgrade_regions(self) -> Option<Vec<UpgradeRegion>> {
646 match self {
647 Self::UpgradeRegions(upgrade_region) => Some(upgrade_region),
648 _ => None,
649 }
650 }
651
652 pub fn into_get_file_refs(self) -> Option<GetFileRefs> {
653 match self {
654 Self::GetFileRefs(get_file_refs) => Some(get_file_refs),
655 _ => None,
656 }
657 }
658
659 pub fn into_gc_regions(self) -> Option<GcRegions> {
660 match self {
661 Self::GcRegions(gc_regions) => Some(gc_regions),
662 _ => None,
663 }
664 }
665
666 pub fn into_enter_staging_regions(self) -> Option<Vec<EnterStagingRegion>> {
667 match self {
668 Self::EnterStagingRegions(enter_staging) => Some(enter_staging),
669 _ => None,
670 }
671 }
672}
673
674#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
676pub struct UpgradeRegionReply {
677 #[serde(default)]
680 pub region_id: RegionId,
681 pub ready: bool,
683 pub exists: bool,
685 pub error: Option<String>,
687}
688
689impl Display for UpgradeRegionReply {
690 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
691 write!(
692 f,
693 "(ready={}, exists={}, error={:?})",
694 self.ready, self.exists, self.error
695 )
696 }
697}
698
699#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
700pub struct DowngradeRegionsReply {
701 pub replies: Vec<DowngradeRegionReply>,
702}
703
704impl DowngradeRegionsReply {
705 pub fn new(replies: Vec<DowngradeRegionReply>) -> Self {
706 Self { replies }
707 }
708
709 pub fn single(reply: DowngradeRegionReply) -> Self {
710 Self::new(vec![reply])
711 }
712}
713
714#[derive(Deserialize)]
715#[serde(untagged)]
716enum DowngradeRegionsCompat {
717 Single(DowngradeRegionReply),
718 Multiple(DowngradeRegionsReply),
719}
720
721fn downgrade_regions_compat_from<'de, D>(deserializer: D) -> Result<DowngradeRegionsReply, D::Error>
722where
723 D: Deserializer<'de>,
724{
725 let helper = DowngradeRegionsCompat::deserialize(deserializer)?;
726 Ok(match helper {
727 DowngradeRegionsCompat::Single(x) => DowngradeRegionsReply::new(vec![x]),
728 DowngradeRegionsCompat::Multiple(reply) => reply,
729 })
730}
731
732#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
733pub struct UpgradeRegionsReply {
734 pub replies: Vec<UpgradeRegionReply>,
735}
736
737impl UpgradeRegionsReply {
738 pub fn new(replies: Vec<UpgradeRegionReply>) -> Self {
739 Self { replies }
740 }
741
742 pub fn single(reply: UpgradeRegionReply) -> Self {
743 Self::new(vec![reply])
744 }
745}
746
747#[derive(Deserialize)]
748#[serde(untagged)]
749enum UpgradeRegionsCompat {
750 Single(UpgradeRegionReply),
751 Multiple(UpgradeRegionsReply),
752}
753
754fn upgrade_regions_compat_from<'de, D>(deserializer: D) -> Result<UpgradeRegionsReply, D::Error>
755where
756 D: Deserializer<'de>,
757{
758 let helper = UpgradeRegionsCompat::deserialize(deserializer)?;
759 Ok(match helper {
760 UpgradeRegionsCompat::Single(x) => UpgradeRegionsReply::new(vec![x]),
761 UpgradeRegionsCompat::Multiple(reply) => reply,
762 })
763}
764
765#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
766pub struct EnterStagingRegionReply {
767 pub region_id: RegionId,
768 pub ready: bool,
770 pub exists: bool,
772 pub error: Option<String>,
774}
775
776#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
777pub struct EnterStagingRegionsReply {
778 pub replies: Vec<EnterStagingRegionReply>,
779}
780
781impl EnterStagingRegionsReply {
782 pub fn new(replies: Vec<EnterStagingRegionReply>) -> Self {
783 Self { replies }
784 }
785}
786
787#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
788pub struct RemapManifestReply {
789 pub exists: bool,
791 pub manifest_paths: HashMap<RegionId, String>,
793 pub error: Option<String>,
795}
796
797impl Display for RemapManifestReply {
798 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
799 write!(
800 f,
801 "RemapManifestReply(manifest_paths={:?}, error={:?})",
802 self.manifest_paths, self.error
803 )
804 }
805}
806
807#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
808pub struct ApplyStagingManifestsReply {
809 pub replies: Vec<ApplyStagingManifestReply>,
810}
811
812impl ApplyStagingManifestsReply {
813 pub fn new(replies: Vec<ApplyStagingManifestReply>) -> Self {
814 Self { replies }
815 }
816}
817
818#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
819pub struct ApplyStagingManifestReply {
820 pub region_id: RegionId,
821 pub ready: bool,
823 pub exists: bool,
825 pub error: Option<String>,
827}
828
829#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
830#[serde(tag = "type", rename_all = "snake_case")]
831pub enum InstructionReply {
832 #[serde(alias = "open_region")]
833 OpenRegions(SimpleReply),
834 #[serde(alias = "close_region")]
835 CloseRegions(SimpleReply),
836 #[serde(
837 deserialize_with = "upgrade_regions_compat_from",
838 alias = "upgrade_region"
839 )]
840 UpgradeRegions(UpgradeRegionsReply),
841 #[serde(
842 alias = "downgrade_region",
843 deserialize_with = "downgrade_regions_compat_from"
844 )]
845 DowngradeRegions(DowngradeRegionsReply),
846 FlushRegions(FlushRegionReply),
847 GetFileRefs(GetFileRefsReply),
848 GcRegions(GcRegionsReply),
849 EnterStagingRegions(EnterStagingRegionsReply),
850 RemapManifest(RemapManifestReply),
851 ApplyStagingManifests(ApplyStagingManifestsReply),
852}
853
854impl Display for InstructionReply {
855 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
856 match self {
857 Self::OpenRegions(reply) => write!(f, "InstructionReply::OpenRegions({})", reply),
858 Self::CloseRegions(reply) => write!(f, "InstructionReply::CloseRegions({})", reply),
859 Self::UpgradeRegions(reply) => {
860 write!(f, "InstructionReply::UpgradeRegions({:?})", reply.replies)
861 }
862 Self::DowngradeRegions(reply) => {
863 write!(f, "InstructionReply::DowngradeRegions({:?})", reply.replies)
864 }
865 Self::FlushRegions(reply) => write!(f, "InstructionReply::FlushRegions({})", reply),
866 Self::GetFileRefs(reply) => write!(f, "InstructionReply::GetFileRefs({})", reply),
867 Self::GcRegions(reply) => write!(f, "InstructionReply::GcRegion({})", reply),
868 Self::EnterStagingRegions(reply) => {
869 write!(
870 f,
871 "InstructionReply::EnterStagingRegions({:?})",
872 reply.replies
873 )
874 }
875 Self::RemapManifest(reply) => write!(f, "InstructionReply::RemapManifest({})", reply),
876 Self::ApplyStagingManifests(reply) => write!(
877 f,
878 "InstructionReply::ApplyStagingManifests({:?})",
879 reply.replies
880 ),
881 }
882 }
883}
884
885#[cfg(any(test, feature = "testing"))]
886impl InstructionReply {
887 pub fn expect_close_regions_reply(self) -> SimpleReply {
888 match self {
889 Self::CloseRegions(reply) => reply,
890 _ => panic!("Expected CloseRegions reply"),
891 }
892 }
893
894 pub fn expect_open_regions_reply(self) -> SimpleReply {
895 match self {
896 Self::OpenRegions(reply) => reply,
897 _ => panic!("Expected OpenRegions reply"),
898 }
899 }
900
901 pub fn expect_upgrade_regions_reply(self) -> Vec<UpgradeRegionReply> {
902 match self {
903 Self::UpgradeRegions(reply) => reply.replies,
904 _ => panic!("Expected UpgradeRegion reply"),
905 }
906 }
907
908 pub fn expect_downgrade_regions_reply(self) -> Vec<DowngradeRegionReply> {
909 match self {
910 Self::DowngradeRegions(reply) => reply.replies,
911 _ => panic!("Expected DowngradeRegion reply"),
912 }
913 }
914
915 pub fn expect_flush_regions_reply(self) -> FlushRegionReply {
916 match self {
917 Self::FlushRegions(reply) => reply,
918 _ => panic!("Expected FlushRegions reply"),
919 }
920 }
921
922 pub fn expect_enter_staging_regions_reply(self) -> Vec<EnterStagingRegionReply> {
923 match self {
924 Self::EnterStagingRegions(reply) => reply.replies,
925 _ => panic!("Expected EnterStagingRegion reply"),
926 }
927 }
928
929 pub fn expect_remap_manifest_reply(self) -> RemapManifestReply {
930 match self {
931 Self::RemapManifest(reply) => reply,
932 _ => panic!("Expected RemapManifest reply"),
933 }
934 }
935
936 pub fn expect_apply_staging_manifests_reply(self) -> Vec<ApplyStagingManifestReply> {
937 match self {
938 Self::ApplyStagingManifests(reply) => reply.replies,
939 _ => panic!("Expected ApplyStagingManifest reply"),
940 }
941 }
942}
943
944#[cfg(test)]
945mod tests {
946 use std::collections::HashSet;
947
948 use store_api::storage::{FileId, FileRef};
949
950 use super::*;
951
952 #[test]
953 fn test_serialize_instruction() {
954 let open_region = Instruction::OpenRegions(vec![OpenRegion::new(
955 RegionIdent {
956 datanode_id: 2,
957 table_id: 1024,
958 region_number: 1,
959 engine: "mito2".to_string(),
960 },
961 "test/foo",
962 HashMap::new(),
963 HashMap::new(),
964 false,
965 )]);
966
967 let serialized = serde_json::to_string(&open_region).unwrap();
968 assert_eq!(
969 r#"{"OpenRegions":[{"region_ident":{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"},"region_storage_path":"test/foo","region_options":{},"region_wal_options":{},"skip_wal_replay":false}]}"#,
970 serialized
971 );
972
973 let close_region = Instruction::CloseRegions(vec![RegionIdent {
974 datanode_id: 2,
975 table_id: 1024,
976 region_number: 1,
977 engine: "mito2".to_string(),
978 }]);
979
980 let serialized = serde_json::to_string(&close_region).unwrap();
981 assert_eq!(
982 r#"{"CloseRegions":[{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"}]}"#,
983 serialized
984 );
985
986 let upgrade_region = Instruction::UpgradeRegions(vec![UpgradeRegion {
987 region_id: RegionId::new(1024, 1),
988 last_entry_id: None,
989 metadata_last_entry_id: None,
990 replay_timeout: Duration::from_millis(1000),
991 location_id: None,
992 replay_entry_id: None,
993 metadata_replay_entry_id: None,
994 }]);
995
996 let serialized = serde_json::to_string(&upgrade_region).unwrap();
997 assert_eq!(
998 r#"{"UpgradeRegions":[{"region_id":4398046511105,"last_entry_id":null,"metadata_last_entry_id":null,"replay_timeout":"1s","location_id":null}]}"#,
999 serialized
1000 );
1001 }
1002
1003 #[test]
1004 fn test_serialize_instruction_reply() {
1005 let downgrade_region_reply = InstructionReply::DowngradeRegions(
1006 DowngradeRegionsReply::single(DowngradeRegionReply {
1007 region_id: RegionId::new(1024, 1),
1008 last_entry_id: None,
1009 metadata_last_entry_id: None,
1010 exists: true,
1011 error: None,
1012 }),
1013 );
1014
1015 let serialized = serde_json::to_string(&downgrade_region_reply).unwrap();
1016 assert_eq!(
1017 r#"{"type":"downgrade_regions","replies":[{"region_id":4398046511105,"last_entry_id":null,"metadata_last_entry_id":null,"exists":true,"error":null}]}"#,
1018 serialized
1019 );
1020
1021 let upgrade_region_reply =
1022 InstructionReply::UpgradeRegions(UpgradeRegionsReply::single(UpgradeRegionReply {
1023 region_id: RegionId::new(1024, 1),
1024 ready: true,
1025 exists: true,
1026 error: None,
1027 }));
1028 let serialized = serde_json::to_string(&upgrade_region_reply).unwrap();
1029 assert_eq!(
1030 r#"{"type":"upgrade_regions","replies":[{"region_id":4398046511105,"ready":true,"exists":true,"error":null}]}"#,
1031 serialized
1032 );
1033 }
1034
1035 #[test]
1036 fn test_deserialize_instruction() {
1037 let open_region_instruction = r#"{"OpenRegion":{"region_ident":{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"},"region_storage_path":"test/foo","region_options":{},"region_wal_options":{},"skip_wal_replay":false}}"#;
1039 let open_region_instruction: Instruction =
1040 serde_json::from_str(open_region_instruction).unwrap();
1041 let open_region = Instruction::OpenRegions(vec![OpenRegion::new(
1042 RegionIdent {
1043 datanode_id: 2,
1044 table_id: 1024,
1045 region_number: 1,
1046 engine: "mito2".to_string(),
1047 },
1048 "test/foo",
1049 HashMap::new(),
1050 HashMap::new(),
1051 false,
1052 )]);
1053 assert_eq!(open_region_instruction, open_region);
1054
1055 let close_region_instruction = r#"{"CloseRegion":{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"}}"#;
1057 let close_region_instruction: Instruction =
1058 serde_json::from_str(close_region_instruction).unwrap();
1059 let close_region = Instruction::CloseRegions(vec![RegionIdent {
1060 datanode_id: 2,
1061 table_id: 1024,
1062 region_number: 1,
1063 engine: "mito2".to_string(),
1064 }]);
1065 assert_eq!(close_region_instruction, close_region);
1066
1067 let downgrade_region_instruction = r#"{"DowngradeRegions":{"region_id":4398046511105,"flush_timeout":{"secs":1,"nanos":0}}}"#;
1069 let downgrade_region_instruction: Instruction =
1070 serde_json::from_str(downgrade_region_instruction).unwrap();
1071 let downgrade_region = Instruction::DowngradeRegions(vec![DowngradeRegion {
1072 region_id: RegionId::new(1024, 1),
1073 flush_timeout: Some(Duration::from_millis(1000)),
1074 }]);
1075 assert_eq!(downgrade_region_instruction, downgrade_region);
1076
1077 let upgrade_region_instruction = r#"{"UpgradeRegion":{"region_id":4398046511105,"last_entry_id":null,"metadata_last_entry_id":null,"replay_timeout":"1s","location_id":null,"replay_entry_id":null,"metadata_replay_entry_id":null}}"#;
1079 let upgrade_region_instruction: Instruction =
1080 serde_json::from_str(upgrade_region_instruction).unwrap();
1081 let upgrade_region = Instruction::UpgradeRegions(vec![UpgradeRegion {
1082 region_id: RegionId::new(1024, 1),
1083 last_entry_id: None,
1084 metadata_last_entry_id: None,
1085 replay_timeout: Duration::from_millis(1000),
1086 location_id: None,
1087 replay_entry_id: None,
1088 metadata_replay_entry_id: None,
1089 }]);
1090 assert_eq!(upgrade_region_instruction, upgrade_region);
1091 }
1092
1093 #[test]
1094 fn test_deserialize_instruction_reply() {
1095 let close_region_instruction_reply =
1097 r#"{"result":true,"error":null,"type":"close_region"}"#;
1098 let close_region_instruction_reply: InstructionReply =
1099 serde_json::from_str(close_region_instruction_reply).unwrap();
1100 let close_region_reply = InstructionReply::CloseRegions(SimpleReply {
1101 result: true,
1102 error: None,
1103 });
1104 assert_eq!(close_region_instruction_reply, close_region_reply);
1105
1106 let open_region_instruction_reply = r#"{"result":true,"error":null,"type":"open_region"}"#;
1108 let open_region_instruction_reply: InstructionReply =
1109 serde_json::from_str(open_region_instruction_reply).unwrap();
1110 let open_region_reply = InstructionReply::OpenRegions(SimpleReply {
1111 result: true,
1112 error: None,
1113 });
1114 assert_eq!(open_region_instruction_reply, open_region_reply);
1115
1116 let downgrade_region_instruction_reply = r#"{"region_id":4398046511105,"last_entry_id":null,"metadata_last_entry_id":null,"exists":true,"error":null,"type":"downgrade_region"}"#;
1118 let downgrade_region_instruction_reply: InstructionReply =
1119 serde_json::from_str(downgrade_region_instruction_reply).unwrap();
1120 let downgrade_region_reply = InstructionReply::DowngradeRegions(
1121 DowngradeRegionsReply::single(DowngradeRegionReply {
1122 region_id: RegionId::new(1024, 1),
1123 last_entry_id: None,
1124 metadata_last_entry_id: None,
1125 exists: true,
1126 error: None,
1127 }),
1128 );
1129 assert_eq!(downgrade_region_instruction_reply, downgrade_region_reply);
1130
1131 let upgrade_region_instruction_reply = r#"{"region_id":4398046511105,"ready":true,"exists":true,"error":null,"type":"upgrade_region"}"#;
1133 let upgrade_region_instruction_reply: InstructionReply =
1134 serde_json::from_str(upgrade_region_instruction_reply).unwrap();
1135 let upgrade_region_reply =
1136 InstructionReply::UpgradeRegions(UpgradeRegionsReply::single(UpgradeRegionReply {
1137 region_id: RegionId::new(1024, 1),
1138 ready: true,
1139 exists: true,
1140 error: None,
1141 }));
1142 assert_eq!(upgrade_region_instruction_reply, upgrade_region_reply);
1143 }
1144
1145 #[derive(Debug, Clone, Serialize, Deserialize)]
1146 struct LegacyOpenRegion {
1147 region_ident: RegionIdent,
1148 region_storage_path: String,
1149 region_options: HashMap<String, String>,
1150 }
1151
1152 #[test]
1153 fn test_compatible_serialize_open_region() {
1154 let region_ident = RegionIdent {
1155 datanode_id: 2,
1156 table_id: 1024,
1157 region_number: 1,
1158 engine: "mito2".to_string(),
1159 };
1160 let region_storage_path = "test/foo".to_string();
1161 let region_options = HashMap::from([
1162 ("a".to_string(), "aa".to_string()),
1163 ("b".to_string(), "bb".to_string()),
1164 ]);
1165
1166 let legacy_open_region = LegacyOpenRegion {
1168 region_ident: region_ident.clone(),
1169 region_storage_path: region_storage_path.clone(),
1170 region_options: region_options.clone(),
1171 };
1172 let serialized = serde_json::to_string(&legacy_open_region).unwrap();
1173
1174 let deserialized = serde_json::from_str(&serialized).unwrap();
1176 let expected = OpenRegion {
1177 region_ident,
1178 region_storage_path,
1179 region_options,
1180 region_wal_options: HashMap::new(),
1181 skip_wal_replay: false,
1182 };
1183 assert_eq!(expected, deserialized);
1184 }
1185
1186 #[test]
1187 fn test_flush_regions_creation() {
1188 let region_id = RegionId::new(1024, 1);
1189
1190 let single_sync = FlushRegions::sync_single(region_id);
1192 assert_eq!(single_sync.region_ids, vec![region_id]);
1193 assert_eq!(single_sync.strategy, FlushStrategy::Sync);
1194 assert!(!single_sync.is_hint());
1195 assert!(single_sync.is_sync());
1196 assert_eq!(single_sync.error_strategy, FlushErrorStrategy::FailFast);
1197 assert!(single_sync.is_single_region());
1198 assert_eq!(single_sync.single_region_id(), Some(region_id));
1199
1200 let region_ids = vec![RegionId::new(1024, 1), RegionId::new(1024, 2)];
1202 let batch_async = FlushRegions::async_batch(region_ids.clone());
1203 assert_eq!(batch_async.region_ids, region_ids);
1204 assert_eq!(batch_async.strategy, FlushStrategy::Async);
1205 assert!(batch_async.is_hint());
1206 assert!(!batch_async.is_sync());
1207 assert_eq!(batch_async.error_strategy, FlushErrorStrategy::TryAll);
1208 assert!(!batch_async.is_single_region());
1209 assert_eq!(batch_async.single_region_id(), None);
1210
1211 let batch_sync = FlushRegions::sync_batch(region_ids.clone(), FlushErrorStrategy::FailFast);
1213 assert_eq!(batch_sync.region_ids, region_ids);
1214 assert_eq!(batch_sync.strategy, FlushStrategy::Sync);
1215 assert!(!batch_sync.is_hint());
1216 assert!(batch_sync.is_sync());
1217 assert_eq!(batch_sync.error_strategy, FlushErrorStrategy::FailFast);
1218 }
1219
1220 #[test]
1221 fn test_flush_regions_conversion() {
1222 let region_id = RegionId::new(1024, 1);
1223
1224 let from_region_id: FlushRegions = region_id.into();
1225 assert_eq!(from_region_id.region_ids, vec![region_id]);
1226 assert_eq!(from_region_id.strategy, FlushStrategy::Sync);
1227 assert!(!from_region_id.is_hint());
1228 assert!(from_region_id.is_sync());
1229
1230 let flush_regions = FlushRegions {
1232 region_ids: vec![region_id],
1233 strategy: FlushStrategy::Async,
1234 error_strategy: FlushErrorStrategy::TryAll,
1235 };
1236 assert_eq!(flush_regions.region_ids, vec![region_id]);
1237 assert_eq!(flush_regions.strategy, FlushStrategy::Async);
1238 assert!(flush_regions.is_hint());
1239 assert!(!flush_regions.is_sync());
1240 }
1241
1242 #[test]
1243 fn test_flush_region_reply() {
1244 let region_id = RegionId::new(1024, 1);
1245
1246 let success_reply = FlushRegionReply::success_single(region_id);
1248 assert!(success_reply.overall_success);
1249 assert_eq!(success_reply.results.len(), 1);
1250 assert_eq!(success_reply.results[0].0, region_id);
1251 assert!(success_reply.results[0].1.is_ok());
1252
1253 let error_reply = FlushRegionReply::error_single(region_id, "test error".to_string());
1255 assert!(!error_reply.overall_success);
1256 assert_eq!(error_reply.results.len(), 1);
1257 assert_eq!(error_reply.results[0].0, region_id);
1258 assert!(error_reply.results[0].1.is_err());
1259
1260 let region_id2 = RegionId::new(1024, 2);
1262 let results = vec![
1263 (region_id, Ok(())),
1264 (region_id2, Err("flush failed".to_string())),
1265 ];
1266 let batch_reply = FlushRegionReply::from_results(results);
1267 assert!(!batch_reply.overall_success);
1268 assert_eq!(batch_reply.results.len(), 2);
1269
1270 let simple_reply = batch_reply.to_simple_reply();
1272 assert!(!simple_reply.result);
1273 assert!(simple_reply.error.is_some());
1274 assert!(simple_reply.error.unwrap().contains("flush failed"));
1275 }
1276
1277 #[test]
1278 fn test_serialize_flush_regions_instruction() {
1279 let region_id = RegionId::new(1024, 1);
1280 let flush_regions = FlushRegions::sync_single(region_id);
1281 let instruction = Instruction::FlushRegions(flush_regions.clone());
1282
1283 let serialized = serde_json::to_string(&instruction).unwrap();
1284 let deserialized: Instruction = serde_json::from_str(&serialized).unwrap();
1285
1286 match deserialized {
1287 Instruction::FlushRegions(fr) => {
1288 assert_eq!(fr.region_ids, vec![region_id]);
1289 assert_eq!(fr.strategy, FlushStrategy::Sync);
1290 assert_eq!(fr.error_strategy, FlushErrorStrategy::FailFast);
1291 }
1292 _ => panic!("Expected FlushRegions instruction"),
1293 }
1294 }
1295
1296 #[test]
1297 fn test_serialize_flush_regions_batch_instruction() {
1298 let region_ids = vec![RegionId::new(1024, 1), RegionId::new(1024, 2)];
1299 let flush_regions =
1300 FlushRegions::sync_batch(region_ids.clone(), FlushErrorStrategy::TryAll);
1301 let instruction = Instruction::FlushRegions(flush_regions);
1302
1303 let serialized = serde_json::to_string(&instruction).unwrap();
1304 let deserialized: Instruction = serde_json::from_str(&serialized).unwrap();
1305
1306 match deserialized {
1307 Instruction::FlushRegions(fr) => {
1308 assert_eq!(fr.region_ids, region_ids);
1309 assert_eq!(fr.strategy, FlushStrategy::Sync);
1310 assert!(!fr.is_hint());
1311 assert!(fr.is_sync());
1312 assert_eq!(fr.error_strategy, FlushErrorStrategy::TryAll);
1313 }
1314 _ => panic!("Expected FlushRegions instruction"),
1315 }
1316 }
1317
1318 #[test]
1319 fn test_serialize_get_file_refs_instruction_reply() {
1320 let mut manifest = FileRefsManifest::default();
1321 let r0 = RegionId::new(1024, 1);
1322 let r1 = RegionId::new(1024, 2);
1323 manifest.file_refs.insert(
1324 r0,
1325 HashSet::from([FileRef::new(r0, FileId::random(), None)]),
1326 );
1327 manifest.file_refs.insert(
1328 r1,
1329 HashSet::from([FileRef::new(r1, FileId::random(), None)]),
1330 );
1331 manifest.manifest_version.insert(r0, 10);
1332 manifest.manifest_version.insert(r1, 20);
1333
1334 let instruction_reply = InstructionReply::GetFileRefs(GetFileRefsReply {
1335 file_refs_manifest: manifest,
1336 success: true,
1337 error: None,
1338 });
1339
1340 let serialized = serde_json::to_string(&instruction_reply).unwrap();
1341 let deserialized = serde_json::from_str(&serialized).unwrap();
1342
1343 assert_eq!(instruction_reply, deserialized);
1344 }
1345}