1use std::collections::{HashMap, HashSet};
16use std::fmt::{Display, Formatter};
17use std::time::Duration;
18
19use serde::{Deserialize, Deserializer, Serialize, Serializer};
20use store_api::region_engine::SyncRegionFromRequest;
21use store_api::region_request::{RegionFlushReason, RegionRequirements};
22use store_api::storage::{FileRefsManifest, GcReport, RegionId, RegionNumber};
23use strum::Display;
24use table::metadata::TableId;
25use table::table_name::TableName;
26
27use crate::flow_name::FlowName;
28use crate::key::schema_name::SchemaName;
29use crate::key::{FlowId, FlowPartitionId};
30use crate::peer::Peer;
31use crate::wal_provider::{RegionWalOptions, region_wal_options_serde};
32use crate::{DatanodeId, FlownodeId};
33
34#[derive(Eq, Hash, PartialEq, Clone, Debug, Serialize, Deserialize)]
35pub struct RegionIdent {
36 pub datanode_id: DatanodeId,
37 pub table_id: TableId,
38 pub region_number: RegionNumber,
39 pub engine: String,
40}
41
42impl RegionIdent {
43 pub fn get_region_id(&self) -> RegionId {
44 RegionId::new(self.table_id, self.region_number)
45 }
46}
47
48impl Display for RegionIdent {
49 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
50 write!(
51 f,
52 "RegionIdent(datanode_id='{}', table_id={}, region_number={}, engine = {})",
53 self.datanode_id, self.table_id, self.region_number, self.engine
54 )
55 }
56}
57
58#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
60pub struct DowngradeRegionReply {
61 #[serde(default)]
64 pub region_id: RegionId,
65 pub last_entry_id: Option<u64>,
67 pub metadata_last_entry_id: Option<u64>,
69 pub exists: bool,
71 pub error: Option<String>,
73}
74
75impl Display for DowngradeRegionReply {
76 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
77 write!(
78 f,
79 "(last_entry_id={:?}, exists={}, error={:?})",
80 self.last_entry_id, self.exists, self.error
81 )
82 }
83}
84
85#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
86pub struct SimpleReply {
87 pub result: bool,
88 pub error: Option<String>,
89}
90
91#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
93pub struct FlushRegionReply {
94 pub results: Vec<(RegionId, Result<(), String>)>,
98 pub overall_success: bool,
100}
101
102impl FlushRegionReply {
103 pub fn success_single(region_id: RegionId) -> Self {
105 Self {
106 results: vec![(region_id, Ok(()))],
107 overall_success: true,
108 }
109 }
110
111 pub fn error_single(region_id: RegionId, error: String) -> Self {
113 Self {
114 results: vec![(region_id, Err(error))],
115 overall_success: false,
116 }
117 }
118
119 pub fn from_results(results: Vec<(RegionId, Result<(), String>)>) -> Self {
121 let overall_success = results.iter().all(|(_, result)| result.is_ok());
122 Self {
123 results,
124 overall_success,
125 }
126 }
127
128 pub fn to_simple_reply(&self) -> SimpleReply {
130 if self.overall_success {
131 SimpleReply {
132 result: true,
133 error: None,
134 }
135 } else {
136 let errors: Vec<String> = self
137 .results
138 .iter()
139 .filter_map(|(region_id, result)| {
140 result
141 .as_ref()
142 .err()
143 .map(|err| format!("{}: {}", region_id, err))
144 })
145 .collect();
146 SimpleReply {
147 result: false,
148 error: Some(errors.join("; ")),
149 }
150 }
151 }
152}
153
154impl Display for SimpleReply {
155 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
156 write!(f, "(result={}, error={:?})", self.result, self.error)
157 }
158}
159
160impl Display for FlushRegionReply {
161 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
162 let results_str = self
163 .results
164 .iter()
165 .map(|(region_id, result)| match result {
166 Ok(()) => format!("{}:OK", region_id),
167 Err(err) => format!("{}:ERR({})", region_id, err),
168 })
169 .collect::<Vec<_>>()
170 .join(", ");
171 write!(
172 f,
173 "(overall_success={}, results=[{}])",
174 self.overall_success, results_str
175 )
176 }
177}
178
179impl Display for OpenRegion {
180 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
181 write!(
182 f,
183 "OpenRegion(region_ident={}, region_storage_path={}, reason={:?})",
184 self.region_ident, self.region_storage_path, self.reason
185 )
186 }
187}
188
189#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
191pub enum OpenRegionReason {
192 RegionMigration,
194 RegionFailover,
196 #[cfg(feature = "enterprise")]
198 RegionFollower,
199}
200
201#[serde_with::serde_as]
202#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
203pub struct OpenRegion {
204 pub region_ident: RegionIdent,
205 pub region_storage_path: String,
206 pub region_options: HashMap<String, String>,
207 #[serde(default)]
208 #[serde(with = "region_wal_options_serde")]
209 pub region_wal_options: RegionWalOptions,
210 #[serde(default)]
211 pub skip_wal_replay: bool,
212 #[serde(default, skip_serializing_if = "Option::is_none")]
213 pub reason: Option<OpenRegionReason>,
214 #[serde(default)]
215 pub requirements: RegionRequirements,
216}
217
218impl OpenRegion {
219 pub fn new(
220 region_ident: RegionIdent,
221 path: &str,
222 region_options: HashMap<String, String>,
223 region_wal_options: RegionWalOptions,
224 skip_wal_replay: bool,
225 reason: Option<OpenRegionReason>,
226 requirements: RegionRequirements,
227 ) -> Self {
228 Self {
229 region_ident,
230 region_storage_path: path.to_string(),
231 region_options,
232 region_wal_options,
233 skip_wal_replay,
234 reason,
235 requirements,
236 }
237 }
238}
239
240#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
242pub struct DowngradeRegion {
243 pub region_id: RegionId,
245 #[serde(default)]
249 pub flush_timeout: Option<Duration>,
250}
251
252impl Display for DowngradeRegion {
253 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
254 write!(
255 f,
256 "DowngradeRegion(region_id={}, flush_timeout={:?})",
257 self.region_id, self.flush_timeout,
258 )
259 }
260}
261
262#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
264pub struct UpgradeRegion {
265 pub region_id: RegionId,
267 pub last_entry_id: Option<u64>,
269 pub metadata_last_entry_id: Option<u64>,
271 #[serde(with = "humantime_serde")]
276 pub replay_timeout: Duration,
277 #[serde(default)]
279 pub location_id: Option<u64>,
280 #[serde(default, skip_serializing_if = "Option::is_none")]
281 pub replay_entry_id: Option<u64>,
282 #[serde(default, skip_serializing_if = "Option::is_none")]
283 pub metadata_replay_entry_id: Option<u64>,
284}
285
286impl UpgradeRegion {
287 pub fn with_replay_entry_id(mut self, replay_entry_id: Option<u64>) -> Self {
289 self.replay_entry_id = replay_entry_id;
290 self
291 }
292
293 pub fn with_metadata_replay_entry_id(mut self, metadata_replay_entry_id: Option<u64>) -> Self {
295 self.metadata_replay_entry_id = metadata_replay_entry_id;
296 self
297 }
298}
299
300#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
301pub enum CacheIdent {
303 FlowId(FlowId),
304 FlowNodeAddressChange(u64),
306 FlowName(FlowName),
307 TableId(TableId),
308 TableName(TableName),
309 SchemaName(SchemaName),
310 CreateFlow(CreateFlow),
311 DropFlow(DropFlow),
312 User(UserCacheIdent),
314}
315
316#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
317pub struct UserCacheIdent {
318 pub catalog: String,
319 pub username: String,
320}
321
322#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
323pub struct CreateFlow {
324 pub flow_id: FlowId,
326 pub source_table_ids: Vec<TableId>,
327 pub partition_to_peer_mapping: Vec<(FlowPartitionId, Peer)>,
329}
330
331#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
332pub struct DropFlow {
333 pub flow_id: FlowId,
334 pub source_table_ids: Vec<TableId>,
335 pub flow_part2node_id: Vec<(FlowPartitionId, FlownodeId)>,
337}
338
339#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
341pub enum FlushStrategy {
342 #[default]
344 Sync,
345 Async,
347}
348
349#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
351pub enum FlushErrorStrategy {
352 #[default]
354 FailFast,
355 TryAll,
357}
358
359#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
362pub struct FlushRegions {
363 pub region_ids: Vec<RegionId>,
365 #[serde(default)]
367 pub strategy: FlushStrategy,
368 #[serde(default)]
370 pub error_strategy: FlushErrorStrategy,
371 #[serde(default, skip_serializing_if = "Option::is_none")]
373 pub reason: Option<RegionFlushReason>,
374}
375
376impl Display for FlushRegions {
377 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
378 write!(
379 f,
380 "FlushRegions(region_ids={:?}, strategy={:?}, error_strategy={:?}, reason={:?})",
381 self.region_ids, self.strategy, self.error_strategy, self.reason
382 )
383 }
384}
385
386impl FlushRegions {
387 pub fn sync_single(region_id: RegionId) -> Self {
389 Self {
390 region_ids: vec![region_id],
391 strategy: FlushStrategy::Sync,
392 error_strategy: FlushErrorStrategy::FailFast,
393 reason: None,
394 }
395 }
396
397 pub fn async_batch(region_ids: Vec<RegionId>) -> Self {
399 Self {
400 region_ids,
401 strategy: FlushStrategy::Async,
402 error_strategy: FlushErrorStrategy::TryAll,
403 reason: None,
404 }
405 }
406
407 pub fn sync_batch(region_ids: Vec<RegionId>, error_strategy: FlushErrorStrategy) -> Self {
409 Self {
410 region_ids,
411 strategy: FlushStrategy::Sync,
412 error_strategy,
413 reason: None,
414 }
415 }
416
417 pub fn with_reason(mut self, reason: RegionFlushReason) -> Self {
418 self.reason = Some(reason);
419 self
420 }
421
422 pub fn is_single_region(&self) -> bool {
424 self.region_ids.len() == 1
425 }
426
427 pub fn single_region_id(&self) -> Option<RegionId> {
429 if self.is_single_region() {
430 self.region_ids.first().copied()
431 } else {
432 None
433 }
434 }
435
436 pub fn is_hint(&self) -> bool {
438 matches!(self.strategy, FlushStrategy::Async)
439 }
440
441 pub fn is_sync(&self) -> bool {
443 matches!(self.strategy, FlushStrategy::Sync)
444 }
445}
446
447impl From<RegionId> for FlushRegions {
448 fn from(region_id: RegionId) -> Self {
449 Self::sync_single(region_id)
450 }
451}
452
453#[derive(Debug, Deserialize)]
454#[serde(untagged)]
455enum SingleOrMultiple<T> {
456 Single(T),
457 Multiple(Vec<T>),
458}
459
460fn single_or_multiple_from<'de, D, T>(deserializer: D) -> Result<Vec<T>, D::Error>
461where
462 D: Deserializer<'de>,
463 T: Deserialize<'de>,
464{
465 let helper = SingleOrMultiple::<T>::deserialize(deserializer)?;
466 Ok(match helper {
467 SingleOrMultiple::Single(x) => vec![x],
468 SingleOrMultiple::Multiple(xs) => xs,
469 })
470}
471
472#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
474pub struct GetFileRefs {
475 pub query_regions: Vec<RegionId>,
477 pub related_regions: HashMap<RegionId, HashSet<RegionId>>,
482}
483
484impl Display for GetFileRefs {
485 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
486 write!(f, "GetFileRefs(region_ids={:?})", self.query_regions)
487 }
488}
489
490#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
492pub struct GcRegions {
493 pub regions: Vec<RegionId>,
495 pub file_refs_manifest: FileRefsManifest,
497 pub full_file_listing: bool,
499}
500
501impl Display for GcRegions {
502 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
503 write!(
504 f,
505 "GcRegion(regions={:?}, file_refs_count={}, full_file_listing={})",
506 self.regions,
507 self.file_refs_manifest.file_refs.len(),
508 self.full_file_listing
509 )
510 }
511}
512
513#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
515pub struct GetFileRefsReply {
516 pub file_refs_manifest: FileRefsManifest,
518 pub success: bool,
520 pub error: Option<String>,
522}
523
524impl Display for GetFileRefsReply {
525 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
526 write!(
527 f,
528 "GetFileRefsReply(success={}, file_refs_count={}, error={:?})",
529 self.success,
530 self.file_refs_manifest.file_refs.len(),
531 self.error
532 )
533 }
534}
535
536#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
538pub struct GcRegionsReply {
539 pub result: Result<GcReport, String>,
540}
541
542impl Display for GcRegionsReply {
543 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
544 write!(
545 f,
546 "GcReply(result={})",
547 match &self.result {
548 Ok(report) => format!(
549 "GcReport(deleted_files_count={}, need_retry_regions_count={})",
550 report.deleted_files.len(),
551 report.need_retry_regions.len()
552 ),
553 Err(err) => format!("Err({})", err),
554 }
555 )
556 }
557}
558
559#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
560pub struct EnterStagingRegion {
561 pub region_id: RegionId,
562 #[serde(
563 alias = "partition_expr",
564 deserialize_with = "deserialize_enter_staging_partition_directive",
565 serialize_with = "serialize_enter_staging_partition_directive"
566 )]
567 pub partition_directive: StagingPartitionDirective,
568}
569
570impl Display for EnterStagingRegion {
571 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
572 write!(
573 f,
574 "EnterStagingRegion(region_id={}, partition_directive={})",
575 self.region_id, self.partition_directive
576 )
577 }
578}
579
580#[derive(Debug, Clone, PartialEq, Eq)]
581pub enum StagingPartitionDirective {
582 UpdatePartitionExpr(String),
583 RejectAllWrites,
584}
585
586impl StagingPartitionDirective {
587 pub fn as_partition_expr(&self) -> Option<&str> {
589 match self {
590 Self::UpdatePartitionExpr(expr) => Some(expr),
591 Self::RejectAllWrites => None,
592 }
593 }
594}
595
596impl Display for StagingPartitionDirective {
597 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
598 match self {
599 Self::UpdatePartitionExpr(expr) => write!(f, "UpdatePartitionExpr({})", expr),
600 Self::RejectAllWrites => write!(f, "RejectAllWrites"),
601 }
602 }
603}
604
605fn serialize_enter_staging_partition_directive<S>(
606 rule: &StagingPartitionDirective,
607 serializer: S,
608) -> std::result::Result<S::Ok, S::Error>
609where
610 S: Serializer,
611{
612 match rule {
613 StagingPartitionDirective::UpdatePartitionExpr(expr) => serializer.serialize_str(expr),
614 StagingPartitionDirective::RejectAllWrites => {
615 #[derive(Serialize)]
616 struct RejectAllWritesSer<'a> {
617 r#type: &'a str,
618 }
619
620 RejectAllWritesSer {
621 r#type: "reject_all_writes",
622 }
623 .serialize(serializer)
624 }
625 }
626}
627
628fn deserialize_enter_staging_partition_directive<'de, D>(
629 deserializer: D,
630) -> std::result::Result<StagingPartitionDirective, D::Error>
631where
632 D: Deserializer<'de>,
633{
634 #[derive(Deserialize)]
635 #[serde(untagged)]
636 enum Compat {
637 Legacy(String),
638 TypeTagged { r#type: String },
639 }
640
641 match Compat::deserialize(deserializer)? {
642 Compat::Legacy(expr) => Ok(StagingPartitionDirective::UpdatePartitionExpr(expr)),
643 Compat::TypeTagged { r#type } if r#type == "reject_all_writes" => {
644 Ok(StagingPartitionDirective::RejectAllWrites)
645 }
646 Compat::TypeTagged { r#type } => Err(serde::de::Error::custom(format!(
647 "Unknown enter staging partition directive type: {}",
648 r#type
649 ))),
650 }
651}
652
653#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
655pub struct SyncRegion {
656 pub region_id: RegionId,
658 pub request: SyncRegionFromRequest,
660}
661
662impl Display for SyncRegion {
663 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
664 write!(
665 f,
666 "SyncRegion(region_id={}, request={:?})",
667 self.region_id, self.request
668 )
669 }
670}
671
672#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
673pub struct RemapManifest {
674 pub region_id: RegionId,
675 pub input_regions: Vec<RegionId>,
677 pub region_mapping: HashMap<RegionId, Vec<RegionId>>,
679 pub new_partition_exprs: HashMap<RegionId, String>,
681}
682
683impl Display for RemapManifest {
684 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
685 write!(
686 f,
687 "RemapManifest(region_id={}, input_regions={:?}, region_mapping={:?}, new_partition_exprs={:?})",
688 self.region_id, self.input_regions, self.region_mapping, self.new_partition_exprs
689 )
690 }
691}
692
693#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
694pub struct ApplyStagingManifest {
695 pub region_id: RegionId,
697 pub partition_expr: String,
699 pub central_region_id: RegionId,
701 pub manifest_path: String,
703}
704
705impl Display for ApplyStagingManifest {
706 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
707 write!(
708 f,
709 "ApplyStagingManifest(region_id={}, partition_expr={}, central_region_id={}, manifest_path={})",
710 self.region_id, self.partition_expr, self.central_region_id, self.manifest_path
711 )
712 }
713}
714
715#[derive(Debug, Clone, Serialize, Deserialize, Display, PartialEq)]
716pub enum Instruction {
717 #[serde(deserialize_with = "single_or_multiple_from", alias = "OpenRegion")]
719 OpenRegions(Vec<OpenRegion>),
720 #[serde(deserialize_with = "single_or_multiple_from", alias = "CloseRegion")]
722 CloseRegions(Vec<RegionIdent>),
723 #[serde(deserialize_with = "single_or_multiple_from", alias = "UpgradeRegion")]
725 UpgradeRegions(Vec<UpgradeRegion>),
726 #[serde(
727 deserialize_with = "single_or_multiple_from",
728 alias = "DowngradeRegion"
729 )]
730 DowngradeRegions(Vec<DowngradeRegion>),
732 InvalidateCaches(Vec<CacheIdent>),
734 FlushRegions(FlushRegions),
736 GetFileRefs(GetFileRefs),
738 GcRegions(GcRegions),
740 Suspend,
742 EnterStagingRegions(Vec<EnterStagingRegion>),
744 SyncRegions(Vec<SyncRegion>),
746 RemapManifest(RemapManifest),
748
749 ApplyStagingManifests(Vec<ApplyStagingManifest>),
751}
752
753impl Instruction {
754 pub fn into_open_regions(self) -> Option<Vec<OpenRegion>> {
756 match self {
757 Self::OpenRegions(open_regions) => Some(open_regions),
758 _ => None,
759 }
760 }
761
762 pub fn into_close_regions(self) -> Option<Vec<RegionIdent>> {
764 match self {
765 Self::CloseRegions(close_regions) => Some(close_regions),
766 _ => None,
767 }
768 }
769
770 pub fn into_flush_regions(self) -> Option<FlushRegions> {
772 match self {
773 Self::FlushRegions(flush_regions) => Some(flush_regions),
774 _ => None,
775 }
776 }
777
778 pub fn into_downgrade_regions(self) -> Option<Vec<DowngradeRegion>> {
780 match self {
781 Self::DowngradeRegions(downgrade_region) => Some(downgrade_region),
782 _ => None,
783 }
784 }
785
786 pub fn into_upgrade_regions(self) -> Option<Vec<UpgradeRegion>> {
788 match self {
789 Self::UpgradeRegions(upgrade_region) => Some(upgrade_region),
790 _ => None,
791 }
792 }
793
794 pub fn into_get_file_refs(self) -> Option<GetFileRefs> {
795 match self {
796 Self::GetFileRefs(get_file_refs) => Some(get_file_refs),
797 _ => None,
798 }
799 }
800
801 pub fn into_gc_regions(self) -> Option<GcRegions> {
802 match self {
803 Self::GcRegions(gc_regions) => Some(gc_regions),
804 _ => None,
805 }
806 }
807
808 pub fn into_enter_staging_regions(self) -> Option<Vec<EnterStagingRegion>> {
809 match self {
810 Self::EnterStagingRegions(enter_staging) => Some(enter_staging),
811 _ => None,
812 }
813 }
814
815 pub fn into_sync_regions(self) -> Option<Vec<SyncRegion>> {
816 match self {
817 Self::SyncRegions(sync_regions) => Some(sync_regions),
818 _ => None,
819 }
820 }
821}
822
823#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
825pub struct UpgradeRegionReply {
826 #[serde(default)]
829 pub region_id: RegionId,
830 pub ready: bool,
832 pub exists: bool,
834 pub error: Option<String>,
836}
837
838impl Display for UpgradeRegionReply {
839 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
840 write!(
841 f,
842 "(ready={}, exists={}, error={:?})",
843 self.ready, self.exists, self.error
844 )
845 }
846}
847
848#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
849pub struct DowngradeRegionsReply {
850 pub replies: Vec<DowngradeRegionReply>,
851}
852
853impl DowngradeRegionsReply {
854 pub fn new(replies: Vec<DowngradeRegionReply>) -> Self {
855 Self { replies }
856 }
857
858 pub fn single(reply: DowngradeRegionReply) -> Self {
859 Self::new(vec![reply])
860 }
861}
862
863#[derive(Deserialize)]
864#[serde(untagged)]
865enum DowngradeRegionsCompat {
866 Single(DowngradeRegionReply),
867 Multiple(DowngradeRegionsReply),
868}
869
870fn downgrade_regions_compat_from<'de, D>(deserializer: D) -> Result<DowngradeRegionsReply, D::Error>
871where
872 D: Deserializer<'de>,
873{
874 let helper = DowngradeRegionsCompat::deserialize(deserializer)?;
875 Ok(match helper {
876 DowngradeRegionsCompat::Single(x) => DowngradeRegionsReply::new(vec![x]),
877 DowngradeRegionsCompat::Multiple(reply) => reply,
878 })
879}
880
881#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
882pub struct UpgradeRegionsReply {
883 pub replies: Vec<UpgradeRegionReply>,
884}
885
886impl UpgradeRegionsReply {
887 pub fn new(replies: Vec<UpgradeRegionReply>) -> Self {
888 Self { replies }
889 }
890
891 pub fn single(reply: UpgradeRegionReply) -> Self {
892 Self::new(vec![reply])
893 }
894}
895
896#[derive(Deserialize)]
897#[serde(untagged)]
898enum UpgradeRegionsCompat {
899 Single(UpgradeRegionReply),
900 Multiple(UpgradeRegionsReply),
901}
902
903fn upgrade_regions_compat_from<'de, D>(deserializer: D) -> Result<UpgradeRegionsReply, D::Error>
904where
905 D: Deserializer<'de>,
906{
907 let helper = UpgradeRegionsCompat::deserialize(deserializer)?;
908 Ok(match helper {
909 UpgradeRegionsCompat::Single(x) => UpgradeRegionsReply::new(vec![x]),
910 UpgradeRegionsCompat::Multiple(reply) => reply,
911 })
912}
913
914#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
915pub struct EnterStagingRegionReply {
916 pub region_id: RegionId,
917 pub ready: bool,
919 pub exists: bool,
921 pub error: Option<String>,
923}
924
925#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
926pub struct EnterStagingRegionsReply {
927 pub replies: Vec<EnterStagingRegionReply>,
928}
929
930impl EnterStagingRegionsReply {
931 pub fn new(replies: Vec<EnterStagingRegionReply>) -> Self {
932 Self { replies }
933 }
934}
935
936#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
938pub struct SyncRegionReply {
939 pub region_id: RegionId,
941 pub ready: bool,
943 pub exists: bool,
945 pub error: Option<String>,
947}
948
949#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
951pub struct SyncRegionsReply {
952 pub replies: Vec<SyncRegionReply>,
953}
954
955impl SyncRegionsReply {
956 pub fn new(replies: Vec<SyncRegionReply>) -> Self {
957 Self { replies }
958 }
959}
960
961#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
962pub struct RemapManifestReply {
963 pub exists: bool,
965 pub manifest_paths: HashMap<RegionId, String>,
967 pub error: Option<String>,
969}
970
971impl Display for RemapManifestReply {
972 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
973 write!(
974 f,
975 "RemapManifestReply(manifest_paths={:?}, error={:?})",
976 self.manifest_paths, self.error
977 )
978 }
979}
980
981#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
982pub struct ApplyStagingManifestsReply {
983 pub replies: Vec<ApplyStagingManifestReply>,
984}
985
986impl ApplyStagingManifestsReply {
987 pub fn new(replies: Vec<ApplyStagingManifestReply>) -> Self {
988 Self { replies }
989 }
990}
991
992#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
993pub struct ApplyStagingManifestReply {
994 pub region_id: RegionId,
995 pub ready: bool,
997 pub exists: bool,
999 pub error: Option<String>,
1001}
1002
1003#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
1004#[serde(tag = "type", rename_all = "snake_case")]
1005pub enum InstructionReply {
1006 #[serde(alias = "open_region")]
1007 OpenRegions(SimpleReply),
1008 #[serde(alias = "close_region")]
1009 CloseRegions(SimpleReply),
1010 #[serde(
1011 deserialize_with = "upgrade_regions_compat_from",
1012 alias = "upgrade_region"
1013 )]
1014 UpgradeRegions(UpgradeRegionsReply),
1015 #[serde(
1016 alias = "downgrade_region",
1017 deserialize_with = "downgrade_regions_compat_from"
1018 )]
1019 DowngradeRegions(DowngradeRegionsReply),
1020 FlushRegions(FlushRegionReply),
1021 GetFileRefs(GetFileRefsReply),
1022 GcRegions(GcRegionsReply),
1023 EnterStagingRegions(EnterStagingRegionsReply),
1024 SyncRegions(SyncRegionsReply),
1025 RemapManifest(RemapManifestReply),
1026 ApplyStagingManifests(ApplyStagingManifestsReply),
1027}
1028
1029impl Display for InstructionReply {
1030 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1031 match self {
1032 Self::OpenRegions(reply) => write!(f, "InstructionReply::OpenRegions({})", reply),
1033 Self::CloseRegions(reply) => write!(f, "InstructionReply::CloseRegions({})", reply),
1034 Self::UpgradeRegions(reply) => {
1035 write!(f, "InstructionReply::UpgradeRegions({:?})", reply.replies)
1036 }
1037 Self::DowngradeRegions(reply) => {
1038 write!(f, "InstructionReply::DowngradeRegions({:?})", reply.replies)
1039 }
1040 Self::FlushRegions(reply) => write!(f, "InstructionReply::FlushRegions({})", reply),
1041 Self::GetFileRefs(reply) => write!(f, "InstructionReply::GetFileRefs({})", reply),
1042 Self::GcRegions(reply) => write!(f, "InstructionReply::GcRegion({})", reply),
1043 Self::EnterStagingRegions(reply) => {
1044 write!(
1045 f,
1046 "InstructionReply::EnterStagingRegions({:?})",
1047 reply.replies
1048 )
1049 }
1050 Self::SyncRegions(reply) => {
1051 write!(f, "InstructionReply::SyncRegions({:?})", reply.replies)
1052 }
1053 Self::RemapManifest(reply) => write!(f, "InstructionReply::RemapManifest({})", reply),
1054 Self::ApplyStagingManifests(reply) => write!(
1055 f,
1056 "InstructionReply::ApplyStagingManifests({:?})",
1057 reply.replies
1058 ),
1059 }
1060 }
1061}
1062
1063#[cfg(any(test, feature = "testing"))]
1064impl InstructionReply {
1065 pub fn expect_close_regions_reply(self) -> SimpleReply {
1066 match self {
1067 Self::CloseRegions(reply) => reply,
1068 _ => panic!("Expected CloseRegions reply"),
1069 }
1070 }
1071
1072 pub fn expect_open_regions_reply(self) -> SimpleReply {
1073 match self {
1074 Self::OpenRegions(reply) => reply,
1075 _ => panic!("Expected OpenRegions reply"),
1076 }
1077 }
1078
1079 pub fn expect_upgrade_regions_reply(self) -> Vec<UpgradeRegionReply> {
1080 match self {
1081 Self::UpgradeRegions(reply) => reply.replies,
1082 _ => panic!("Expected UpgradeRegion reply"),
1083 }
1084 }
1085
1086 pub fn expect_downgrade_regions_reply(self) -> Vec<DowngradeRegionReply> {
1087 match self {
1088 Self::DowngradeRegions(reply) => reply.replies,
1089 _ => panic!("Expected DowngradeRegion reply"),
1090 }
1091 }
1092
1093 pub fn expect_flush_regions_reply(self) -> FlushRegionReply {
1094 match self {
1095 Self::FlushRegions(reply) => reply,
1096 _ => panic!("Expected FlushRegions reply"),
1097 }
1098 }
1099
1100 pub fn expect_enter_staging_regions_reply(self) -> Vec<EnterStagingRegionReply> {
1101 match self {
1102 Self::EnterStagingRegions(reply) => reply.replies,
1103 _ => panic!("Expected EnterStagingRegion reply"),
1104 }
1105 }
1106
1107 pub fn expect_sync_regions_reply(self) -> Vec<SyncRegionReply> {
1108 match self {
1109 Self::SyncRegions(reply) => reply.replies,
1110 _ => panic!("Expected SyncRegion reply"),
1111 }
1112 }
1113
1114 pub fn expect_remap_manifest_reply(self) -> RemapManifestReply {
1115 match self {
1116 Self::RemapManifest(reply) => reply,
1117 _ => panic!("Expected RemapManifest reply"),
1118 }
1119 }
1120
1121 pub fn expect_apply_staging_manifests_reply(self) -> Vec<ApplyStagingManifestReply> {
1122 match self {
1123 Self::ApplyStagingManifests(reply) => reply.replies,
1124 _ => panic!("Expected ApplyStagingManifest reply"),
1125 }
1126 }
1127}
1128
1129#[cfg(test)]
1130mod tests {
1131 use std::collections::HashSet;
1132
1133 use common_wal::options::WalOptions;
1134 use store_api::storage::{FileId, FileRef};
1135
1136 use super::*;
1137
1138 #[test]
1139 fn test_serialize_instruction() {
1140 let open_region = Instruction::OpenRegions(vec![OpenRegion::new(
1141 RegionIdent {
1142 datanode_id: 2,
1143 table_id: 1024,
1144 region_number: 1,
1145 engine: "mito2".to_string(),
1146 },
1147 "test/foo",
1148 HashMap::new(),
1149 HashMap::new(),
1150 false,
1151 None,
1152 RegionRequirements::empty(),
1153 )]);
1154
1155 let serialized = serde_json::to_string(&open_region).unwrap();
1156 assert_eq!(
1157 r#"{"OpenRegions":[{"region_ident":{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"},"region_storage_path":"test/foo","region_options":{},"region_wal_options":{},"skip_wal_replay":false,"requirements":{"object_storage":false}}]}"#,
1158 serialized
1159 );
1160
1161 let close_region = Instruction::CloseRegions(vec![RegionIdent {
1162 datanode_id: 2,
1163 table_id: 1024,
1164 region_number: 1,
1165 engine: "mito2".to_string(),
1166 }]);
1167
1168 let serialized = serde_json::to_string(&close_region).unwrap();
1169 assert_eq!(
1170 r#"{"CloseRegions":[{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"}]}"#,
1171 serialized
1172 );
1173
1174 let upgrade_region = Instruction::UpgradeRegions(vec![UpgradeRegion {
1175 region_id: RegionId::new(1024, 1),
1176 last_entry_id: None,
1177 metadata_last_entry_id: None,
1178 replay_timeout: Duration::from_millis(1000),
1179 location_id: None,
1180 replay_entry_id: None,
1181 metadata_replay_entry_id: None,
1182 }]);
1183
1184 let serialized = serde_json::to_string(&upgrade_region).unwrap();
1185 assert_eq!(
1186 r#"{"UpgradeRegions":[{"region_id":4398046511105,"last_entry_id":null,"metadata_last_entry_id":null,"replay_timeout":"1s","location_id":null}]}"#,
1187 serialized
1188 );
1189 }
1190
1191 #[test]
1192 fn test_serialize_instruction_reply() {
1193 let downgrade_region_reply = InstructionReply::DowngradeRegions(
1194 DowngradeRegionsReply::single(DowngradeRegionReply {
1195 region_id: RegionId::new(1024, 1),
1196 last_entry_id: None,
1197 metadata_last_entry_id: None,
1198 exists: true,
1199 error: None,
1200 }),
1201 );
1202
1203 let serialized = serde_json::to_string(&downgrade_region_reply).unwrap();
1204 assert_eq!(
1205 r#"{"type":"downgrade_regions","replies":[{"region_id":4398046511105,"last_entry_id":null,"metadata_last_entry_id":null,"exists":true,"error":null}]}"#,
1206 serialized
1207 );
1208
1209 let upgrade_region_reply =
1210 InstructionReply::UpgradeRegions(UpgradeRegionsReply::single(UpgradeRegionReply {
1211 region_id: RegionId::new(1024, 1),
1212 ready: true,
1213 exists: true,
1214 error: None,
1215 }));
1216 let serialized = serde_json::to_string(&upgrade_region_reply).unwrap();
1217 assert_eq!(
1218 r#"{"type":"upgrade_regions","replies":[{"region_id":4398046511105,"ready":true,"exists":true,"error":null}]}"#,
1219 serialized
1220 );
1221 }
1222
1223 #[test]
1224 fn test_deserialize_instruction() {
1225 let open_region_instruction = r#"{"OpenRegion":{"region_ident":{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"},"region_storage_path":"test/foo","region_options":{},"region_wal_options":{},"skip_wal_replay":false}}"#;
1227 let open_region_instruction: Instruction =
1228 serde_json::from_str(open_region_instruction).unwrap();
1229 let open_region = Instruction::OpenRegions(vec![OpenRegion::new(
1230 RegionIdent {
1231 datanode_id: 2,
1232 table_id: 1024,
1233 region_number: 1,
1234 engine: "mito2".to_string(),
1235 },
1236 "test/foo",
1237 HashMap::new(),
1238 HashMap::new(),
1239 false,
1240 None,
1241 RegionRequirements::empty(),
1242 )]);
1243 assert_eq!(open_region_instruction, open_region);
1244
1245 let close_region_instruction = r#"{"CloseRegion":{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"}}"#;
1247 let close_region_instruction: Instruction =
1248 serde_json::from_str(close_region_instruction).unwrap();
1249 let close_region = Instruction::CloseRegions(vec![RegionIdent {
1250 datanode_id: 2,
1251 table_id: 1024,
1252 region_number: 1,
1253 engine: "mito2".to_string(),
1254 }]);
1255 assert_eq!(close_region_instruction, close_region);
1256
1257 let downgrade_region_instruction = r#"{"DowngradeRegions":{"region_id":4398046511105,"flush_timeout":{"secs":1,"nanos":0}}}"#;
1259 let downgrade_region_instruction: Instruction =
1260 serde_json::from_str(downgrade_region_instruction).unwrap();
1261 let downgrade_region = Instruction::DowngradeRegions(vec![DowngradeRegion {
1262 region_id: RegionId::new(1024, 1),
1263 flush_timeout: Some(Duration::from_millis(1000)),
1264 }]);
1265 assert_eq!(downgrade_region_instruction, downgrade_region);
1266
1267 let upgrade_region_instruction = r#"{"UpgradeRegion":{"region_id":4398046511105,"last_entry_id":null,"metadata_last_entry_id":null,"replay_timeout":"1s","location_id":null,"replay_entry_id":null,"metadata_replay_entry_id":null}}"#;
1269 let upgrade_region_instruction: Instruction =
1270 serde_json::from_str(upgrade_region_instruction).unwrap();
1271 let upgrade_region = Instruction::UpgradeRegions(vec![UpgradeRegion {
1272 region_id: RegionId::new(1024, 1),
1273 last_entry_id: None,
1274 metadata_last_entry_id: None,
1275 replay_timeout: Duration::from_millis(1000),
1276 location_id: None,
1277 replay_entry_id: None,
1278 metadata_replay_entry_id: None,
1279 }]);
1280 assert_eq!(upgrade_region_instruction, upgrade_region);
1281 }
1282
1283 #[test]
1284 fn test_deserialize_instruction_reply() {
1285 let close_region_instruction_reply =
1287 r#"{"result":true,"error":null,"type":"close_region"}"#;
1288 let close_region_instruction_reply: InstructionReply =
1289 serde_json::from_str(close_region_instruction_reply).unwrap();
1290 let close_region_reply = InstructionReply::CloseRegions(SimpleReply {
1291 result: true,
1292 error: None,
1293 });
1294 assert_eq!(close_region_instruction_reply, close_region_reply);
1295
1296 let open_region_instruction_reply = r#"{"result":true,"error":null,"type":"open_region"}"#;
1298 let open_region_instruction_reply: InstructionReply =
1299 serde_json::from_str(open_region_instruction_reply).unwrap();
1300 let open_region_reply = InstructionReply::OpenRegions(SimpleReply {
1301 result: true,
1302 error: None,
1303 });
1304 assert_eq!(open_region_instruction_reply, open_region_reply);
1305
1306 let downgrade_region_instruction_reply = r#"{"region_id":4398046511105,"last_entry_id":null,"metadata_last_entry_id":null,"exists":true,"error":null,"type":"downgrade_region"}"#;
1308 let downgrade_region_instruction_reply: InstructionReply =
1309 serde_json::from_str(downgrade_region_instruction_reply).unwrap();
1310 let downgrade_region_reply = InstructionReply::DowngradeRegions(
1311 DowngradeRegionsReply::single(DowngradeRegionReply {
1312 region_id: RegionId::new(1024, 1),
1313 last_entry_id: None,
1314 metadata_last_entry_id: None,
1315 exists: true,
1316 error: None,
1317 }),
1318 );
1319 assert_eq!(downgrade_region_instruction_reply, downgrade_region_reply);
1320
1321 let upgrade_region_instruction_reply = r#"{"region_id":4398046511105,"ready":true,"exists":true,"error":null,"type":"upgrade_region"}"#;
1323 let upgrade_region_instruction_reply: InstructionReply =
1324 serde_json::from_str(upgrade_region_instruction_reply).unwrap();
1325 let upgrade_region_reply =
1326 InstructionReply::UpgradeRegions(UpgradeRegionsReply::single(UpgradeRegionReply {
1327 region_id: RegionId::new(1024, 1),
1328 ready: true,
1329 exists: true,
1330 error: None,
1331 }));
1332 assert_eq!(upgrade_region_instruction_reply, upgrade_region_reply);
1333 }
1334
1335 #[test]
1336 fn test_enter_staging_partition_rule_compatibility() {
1337 let legacy = r#"{"region_id":4398046511105,"partition_expr":"{\"Expr\":{\"lhs\":{\"Column\":\"x\"},\"op\":\"GtEq\",\"rhs\":{\"Value\":{\"Int32\":0}}}}"}"#;
1338 let enter: EnterStagingRegion = serde_json::from_str(legacy).unwrap();
1339 assert_eq!(enter.region_id, RegionId::new(1024, 1));
1340 assert_eq!(
1341 enter.partition_directive,
1342 StagingPartitionDirective::UpdatePartitionExpr(
1343 "{\"Expr\":{\"lhs\":{\"Column\":\"x\"},\"op\":\"GtEq\",\"rhs\":{\"Value\":{\"Int32\":0}}}}"
1344 .to_string()
1345 )
1346 );
1347
1348 let serialized = serde_json::to_string(&enter).unwrap();
1349 assert!(serialized.contains("\"partition_directive\":\""));
1350 assert!(!serialized.contains("partition_expr"));
1351
1352 let reject = r#"{"region_id":4398046511105,"partition_expr":{"type":"reject_all_writes"}}"#;
1353 let enter: EnterStagingRegion = serde_json::from_str(reject).unwrap();
1354 assert_eq!(
1355 enter.partition_directive,
1356 StagingPartitionDirective::RejectAllWrites
1357 );
1358 }
1359
1360 #[derive(Debug, Clone, Serialize, Deserialize)]
1361 struct LegacyOpenRegion {
1362 region_ident: RegionIdent,
1363 region_storage_path: String,
1364 region_options: HashMap<String, String>,
1365 }
1366
1367 #[test]
1368 fn test_compatible_serialize_open_region() {
1369 let region_ident = RegionIdent {
1370 datanode_id: 2,
1371 table_id: 1024,
1372 region_number: 1,
1373 engine: "mito2".to_string(),
1374 };
1375 let region_storage_path = "test/foo".to_string();
1376 let region_options = HashMap::from([
1377 ("a".to_string(), "aa".to_string()),
1378 ("b".to_string(), "bb".to_string()),
1379 ]);
1380
1381 let legacy_open_region = LegacyOpenRegion {
1383 region_ident: region_ident.clone(),
1384 region_storage_path: region_storage_path.clone(),
1385 region_options: region_options.clone(),
1386 };
1387 let serialized = serde_json::to_string(&legacy_open_region).unwrap();
1388
1389 let deserialized = serde_json::from_str(&serialized).unwrap();
1391 let expected = OpenRegion {
1392 region_ident,
1393 region_storage_path,
1394 region_options,
1395 region_wal_options: HashMap::new(),
1396 skip_wal_replay: false,
1397 reason: None,
1398 requirements: RegionRequirements::empty(),
1399 };
1400 assert_eq!(expected, deserialized);
1401 }
1402
1403 #[test]
1404 fn test_deserialize_open_region_with_legacy_region_wal_options() {
1405 let open_region = r#"{"region_ident":{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"},"region_storage_path":"test/foo","region_options":{},"region_wal_options":{"1":"{\"wal.provider\":\"raft_engine\"}"},"skip_wal_replay":false}"#;
1406
1407 let open_region: OpenRegion = serde_json::from_str(open_region).unwrap();
1408
1409 assert_eq!(
1410 open_region.region_wal_options,
1411 HashMap::from([(1, WalOptions::RaftEngine)])
1412 );
1413 }
1414
1415 #[test]
1416 fn test_serialize_open_region_with_reason_and_requirements() {
1417 let open_region = OpenRegion::new(
1418 RegionIdent {
1419 datanode_id: 2,
1420 table_id: 1024,
1421 region_number: 1,
1422 engine: "mito2".to_string(),
1423 },
1424 "test/foo",
1425 HashMap::new(),
1426 HashMap::new(),
1427 false,
1428 Some(OpenRegionReason::RegionMigration),
1429 RegionRequirements::object_storage(),
1430 );
1431
1432 let serialized = serde_json::to_string(&open_region).unwrap();
1433 assert!(serialized.contains(r#""reason":"RegionMigration""#));
1434 assert!(serialized.contains(r#""object_storage":true"#));
1435
1436 let deserialized: OpenRegion = serde_json::from_str(&serialized).unwrap();
1437 assert_eq!(Some(OpenRegionReason::RegionMigration), deserialized.reason);
1438 assert_eq!(
1439 RegionRequirements::object_storage(),
1440 deserialized.requirements
1441 );
1442 }
1443
1444 #[test]
1445 fn test_flush_regions_creation() {
1446 let region_id = RegionId::new(1024, 1);
1447
1448 let single_sync = FlushRegions::sync_single(region_id);
1450 assert_eq!(single_sync.region_ids, vec![region_id]);
1451 assert_eq!(single_sync.strategy, FlushStrategy::Sync);
1452 assert!(!single_sync.is_hint());
1453 assert!(single_sync.is_sync());
1454 assert_eq!(single_sync.error_strategy, FlushErrorStrategy::FailFast);
1455 assert_eq!(single_sync.reason, None);
1456 assert!(single_sync.is_single_region());
1457 assert_eq!(single_sync.single_region_id(), Some(region_id));
1458
1459 let region_ids = vec![RegionId::new(1024, 1), RegionId::new(1024, 2)];
1461 let batch_async = FlushRegions::async_batch(region_ids.clone());
1462 assert_eq!(batch_async.region_ids, region_ids);
1463 assert_eq!(batch_async.strategy, FlushStrategy::Async);
1464 assert!(batch_async.is_hint());
1465 assert!(!batch_async.is_sync());
1466 assert_eq!(batch_async.error_strategy, FlushErrorStrategy::TryAll);
1467 assert_eq!(batch_async.reason, None);
1468 assert!(!batch_async.is_single_region());
1469 assert_eq!(batch_async.single_region_id(), None);
1470
1471 let batch_sync = FlushRegions::sync_batch(region_ids.clone(), FlushErrorStrategy::FailFast);
1473 assert_eq!(batch_sync.region_ids, region_ids);
1474 assert_eq!(batch_sync.strategy, FlushStrategy::Sync);
1475 assert!(!batch_sync.is_hint());
1476 assert!(batch_sync.is_sync());
1477 assert_eq!(batch_sync.error_strategy, FlushErrorStrategy::FailFast);
1478 assert_eq!(batch_sync.reason, None);
1479
1480 let with_reason = batch_sync.with_reason(RegionFlushReason::RemoteWalPrune);
1481 assert_eq!(with_reason.reason, Some(RegionFlushReason::RemoteWalPrune));
1482 }
1483
1484 #[test]
1485 fn test_flush_regions_conversion() {
1486 let region_id = RegionId::new(1024, 1);
1487
1488 let from_region_id: FlushRegions = region_id.into();
1489 assert_eq!(from_region_id.region_ids, vec![region_id]);
1490 assert_eq!(from_region_id.strategy, FlushStrategy::Sync);
1491 assert!(!from_region_id.is_hint());
1492 assert!(from_region_id.is_sync());
1493
1494 let flush_regions = FlushRegions {
1496 region_ids: vec![region_id],
1497 strategy: FlushStrategy::Async,
1498 error_strategy: FlushErrorStrategy::TryAll,
1499 reason: None,
1500 };
1501 assert_eq!(flush_regions.region_ids, vec![region_id]);
1502 assert_eq!(flush_regions.strategy, FlushStrategy::Async);
1503 assert!(flush_regions.is_hint());
1504 assert!(!flush_regions.is_sync());
1505 }
1506
1507 #[test]
1508 fn test_flush_region_reply() {
1509 let region_id = RegionId::new(1024, 1);
1510
1511 let success_reply = FlushRegionReply::success_single(region_id);
1513 assert!(success_reply.overall_success);
1514 assert_eq!(success_reply.results.len(), 1);
1515 assert_eq!(success_reply.results[0].0, region_id);
1516 assert!(success_reply.results[0].1.is_ok());
1517
1518 let error_reply = FlushRegionReply::error_single(region_id, "test error".to_string());
1520 assert!(!error_reply.overall_success);
1521 assert_eq!(error_reply.results.len(), 1);
1522 assert_eq!(error_reply.results[0].0, region_id);
1523 assert!(error_reply.results[0].1.is_err());
1524
1525 let region_id2 = RegionId::new(1024, 2);
1527 let results = vec![
1528 (region_id, Ok(())),
1529 (region_id2, Err("flush failed".to_string())),
1530 ];
1531 let batch_reply = FlushRegionReply::from_results(results);
1532 assert!(!batch_reply.overall_success);
1533 assert_eq!(batch_reply.results.len(), 2);
1534
1535 let simple_reply = batch_reply.to_simple_reply();
1537 assert!(!simple_reply.result);
1538 assert!(simple_reply.error.is_some());
1539 assert!(simple_reply.error.unwrap().contains("flush failed"));
1540 }
1541
1542 #[test]
1543 fn test_serialize_flush_regions_instruction() {
1544 let region_id = RegionId::new(1024, 1);
1545 let flush_regions = FlushRegions::sync_single(region_id);
1546 let instruction = Instruction::FlushRegions(flush_regions.clone());
1547
1548 let serialized = serde_json::to_string(&instruction).unwrap();
1549 assert!(!serialized.contains("reason"));
1550 let deserialized: Instruction = serde_json::from_str(&serialized).unwrap();
1551
1552 match deserialized {
1553 Instruction::FlushRegions(fr) => {
1554 assert_eq!(fr.region_ids, vec![region_id]);
1555 assert_eq!(fr.strategy, FlushStrategy::Sync);
1556 assert_eq!(fr.error_strategy, FlushErrorStrategy::FailFast);
1557 assert_eq!(fr.reason, None);
1558 }
1559 _ => panic!("Expected FlushRegions instruction"),
1560 }
1561
1562 let legacy = r#"{"FlushRegions":{"region_ids":[4398046511105],"strategy":"Sync","error_strategy":"FailFast"}}"#;
1563 let deserialized: Instruction = serde_json::from_str(legacy).unwrap();
1564 match deserialized {
1565 Instruction::FlushRegions(fr) => {
1566 assert_eq!(fr.region_ids, vec![region_id]);
1567 assert_eq!(fr.strategy, FlushStrategy::Sync);
1568 assert_eq!(fr.error_strategy, FlushErrorStrategy::FailFast);
1569 assert_eq!(fr.reason, None);
1570 }
1571 _ => panic!("Expected FlushRegions instruction"),
1572 }
1573
1574 let flush_regions = FlushRegions::async_batch(vec![region_id])
1575 .with_reason(RegionFlushReason::RemoteWalPrune);
1576 let instruction = Instruction::FlushRegions(flush_regions);
1577 let serialized = serde_json::to_string(&instruction).unwrap();
1578 assert!(serialized.contains(r#""reason":"RemoteWalPrune""#));
1579 let deserialized: Instruction = serde_json::from_str(&serialized).unwrap();
1580 match deserialized {
1581 Instruction::FlushRegions(fr) => {
1582 assert_eq!(fr.reason, Some(RegionFlushReason::RemoteWalPrune));
1583 }
1584 _ => panic!("Expected FlushRegions instruction"),
1585 }
1586 }
1587
1588 #[test]
1589 fn test_serialize_flush_regions_batch_instruction() {
1590 let region_ids = vec![RegionId::new(1024, 1), RegionId::new(1024, 2)];
1591 let flush_regions =
1592 FlushRegions::sync_batch(region_ids.clone(), FlushErrorStrategy::TryAll);
1593 let instruction = Instruction::FlushRegions(flush_regions);
1594
1595 let serialized = serde_json::to_string(&instruction).unwrap();
1596 let deserialized: Instruction = serde_json::from_str(&serialized).unwrap();
1597
1598 match deserialized {
1599 Instruction::FlushRegions(fr) => {
1600 assert_eq!(fr.region_ids, region_ids);
1601 assert_eq!(fr.strategy, FlushStrategy::Sync);
1602 assert!(!fr.is_hint());
1603 assert!(fr.is_sync());
1604 assert_eq!(fr.error_strategy, FlushErrorStrategy::TryAll);
1605 assert_eq!(fr.reason, None);
1606 }
1607 _ => panic!("Expected FlushRegions instruction"),
1608 }
1609 }
1610
1611 #[test]
1612 fn test_serialize_get_file_refs_instruction_reply() {
1613 let mut manifest = FileRefsManifest::default();
1614 let r0 = RegionId::new(1024, 1);
1615 let r1 = RegionId::new(1024, 2);
1616 manifest.file_refs.insert(
1617 r0,
1618 HashSet::from([FileRef::new(r0, FileId::random(), None)]),
1619 );
1620 manifest.file_refs.insert(
1621 r1,
1622 HashSet::from([FileRef::new(r1, FileId::random(), None)]),
1623 );
1624 manifest.manifest_version.insert(r0, 10);
1625 manifest.manifest_version.insert(r1, 20);
1626
1627 let instruction_reply = InstructionReply::GetFileRefs(GetFileRefsReply {
1628 file_refs_manifest: manifest,
1629 success: true,
1630 error: None,
1631 });
1632
1633 let serialized = serde_json::to_string(&instruction_reply).unwrap();
1634 let deserialized = serde_json::from_str(&serialized).unwrap();
1635
1636 assert_eq!(instruction_reply, deserialized);
1637 }
1638}