common_meta/
instruction.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::fmt::{Display, Formatter};
17use std::time::Duration;
18
19use serde::{Deserialize, Deserializer, Serialize};
20use store_api::storage::{FileRefsManifest, GcReport, RegionId, RegionNumber};
21use strum::Display;
22use table::metadata::TableId;
23use table::table_name::TableName;
24
25use crate::flow_name::FlowName;
26use crate::key::schema_name::SchemaName;
27use crate::key::{FlowId, FlowPartitionId};
28use crate::peer::Peer;
29use crate::{DatanodeId, FlownodeId};
30
31#[derive(Eq, Hash, PartialEq, Clone, Debug, Serialize, Deserialize)]
32pub struct RegionIdent {
33    pub datanode_id: DatanodeId,
34    pub table_id: TableId,
35    pub region_number: RegionNumber,
36    pub engine: String,
37}
38
39impl RegionIdent {
40    pub fn get_region_id(&self) -> RegionId {
41        RegionId::new(self.table_id, self.region_number)
42    }
43}
44
45impl Display for RegionIdent {
46    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
47        write!(
48            f,
49            "RegionIdent(datanode_id='{}', table_id={}, region_number={}, engine = {})",
50            self.datanode_id, self.table_id, self.region_number, self.engine
51        )
52    }
53}
54
55/// The result of downgrade leader region.
56#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
57pub struct DowngradeRegionReply {
58    /// The [RegionId].
59    /// For compatibility, it is defaulted to [RegionId::new(0, 0)].
60    #[serde(default)]
61    pub region_id: RegionId,
62    /// Returns the `last_entry_id` if available.
63    pub last_entry_id: Option<u64>,
64    /// Returns the `metadata_last_entry_id` if available (Only available for metric engine).
65    pub metadata_last_entry_id: Option<u64>,
66    /// Indicates whether the region exists.
67    pub exists: bool,
68    /// Return error if any during the operation.
69    pub error: Option<String>,
70}
71
72impl Display for DowngradeRegionReply {
73    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
74        write!(
75            f,
76            "(last_entry_id={:?}, exists={}, error={:?})",
77            self.last_entry_id, self.exists, self.error
78        )
79    }
80}
81
82#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
83pub struct SimpleReply {
84    pub result: bool,
85    pub error: Option<String>,
86}
87
88/// Reply for flush region operations with support for batch results.
89#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
90pub struct FlushRegionReply {
91    /// Results for each region that was attempted to be flushed.
92    /// For single region flushes, this will contain one result.
93    /// For batch flushes, this contains results for all attempted regions.
94    pub results: Vec<(RegionId, Result<(), String>)>,
95    /// Overall success: true if all regions were flushed successfully.
96    pub overall_success: bool,
97}
98
99impl FlushRegionReply {
100    /// Create a successful single region reply.
101    pub fn success_single(region_id: RegionId) -> Self {
102        Self {
103            results: vec![(region_id, Ok(()))],
104            overall_success: true,
105        }
106    }
107
108    /// Create a failed single region reply.
109    pub fn error_single(region_id: RegionId, error: String) -> Self {
110        Self {
111            results: vec![(region_id, Err(error))],
112            overall_success: false,
113        }
114    }
115
116    /// Create a batch reply from individual results.
117    pub fn from_results(results: Vec<(RegionId, Result<(), String>)>) -> Self {
118        let overall_success = results.iter().all(|(_, result)| result.is_ok());
119        Self {
120            results,
121            overall_success,
122        }
123    }
124
125    /// Convert to SimpleReply for backward compatibility.
126    pub fn to_simple_reply(&self) -> SimpleReply {
127        if self.overall_success {
128            SimpleReply {
129                result: true,
130                error: None,
131            }
132        } else {
133            let errors: Vec<String> = self
134                .results
135                .iter()
136                .filter_map(|(region_id, result)| {
137                    result
138                        .as_ref()
139                        .err()
140                        .map(|err| format!("{}: {}", region_id, err))
141                })
142                .collect();
143            SimpleReply {
144                result: false,
145                error: Some(errors.join("; ")),
146            }
147        }
148    }
149}
150
151impl Display for SimpleReply {
152    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
153        write!(f, "(result={}, error={:?})", self.result, self.error)
154    }
155}
156
157impl Display for FlushRegionReply {
158    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
159        let results_str = self
160            .results
161            .iter()
162            .map(|(region_id, result)| match result {
163                Ok(()) => format!("{}:OK", region_id),
164                Err(err) => format!("{}:ERR({})", region_id, err),
165            })
166            .collect::<Vec<_>>()
167            .join(", ");
168        write!(
169            f,
170            "(overall_success={}, results=[{}])",
171            self.overall_success, results_str
172        )
173    }
174}
175
176impl Display for OpenRegion {
177    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
178        write!(
179            f,
180            "OpenRegion(region_ident={}, region_storage_path={})",
181            self.region_ident, self.region_storage_path
182        )
183    }
184}
185
186#[serde_with::serde_as]
187#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
188pub struct OpenRegion {
189    pub region_ident: RegionIdent,
190    pub region_storage_path: String,
191    pub region_options: HashMap<String, String>,
192    #[serde(default)]
193    #[serde_as(as = "HashMap<serde_with::DisplayFromStr, _>")]
194    pub region_wal_options: HashMap<RegionNumber, String>,
195    #[serde(default)]
196    pub skip_wal_replay: bool,
197}
198
199impl OpenRegion {
200    pub fn new(
201        region_ident: RegionIdent,
202        path: &str,
203        region_options: HashMap<String, String>,
204        region_wal_options: HashMap<RegionNumber, String>,
205        skip_wal_replay: bool,
206    ) -> Self {
207        Self {
208            region_ident,
209            region_storage_path: path.to_string(),
210            region_options,
211            region_wal_options,
212            skip_wal_replay,
213        }
214    }
215}
216
217/// The instruction of downgrading leader region.
218#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
219pub struct DowngradeRegion {
220    /// The [RegionId].
221    pub region_id: RegionId,
222    /// The timeout of waiting for flush the region.
223    ///
224    /// `None` stands for don't flush before downgrading the region.
225    #[serde(default)]
226    pub flush_timeout: Option<Duration>,
227}
228
229impl Display for DowngradeRegion {
230    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
231        write!(
232            f,
233            "DowngradeRegion(region_id={}, flush_timeout={:?})",
234            self.region_id, self.flush_timeout,
235        )
236    }
237}
238
239/// Upgrades a follower region to leader region.
240#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
241pub struct UpgradeRegion {
242    /// The [RegionId].
243    pub region_id: RegionId,
244    /// The `last_entry_id` of old leader region.
245    pub last_entry_id: Option<u64>,
246    /// The `last_entry_id` of old leader metadata region (Only used for metric engine).
247    pub metadata_last_entry_id: Option<u64>,
248    /// The timeout of waiting for a wal replay.
249    ///
250    /// `None` stands for no wait,
251    /// it's helpful to verify whether the leader region is ready.
252    #[serde(with = "humantime_serde")]
253    pub replay_timeout: Duration,
254    /// The hint for replaying memtable.
255    #[serde(default)]
256    pub location_id: Option<u64>,
257    #[serde(default, skip_serializing_if = "Option::is_none")]
258    pub replay_entry_id: Option<u64>,
259    #[serde(default, skip_serializing_if = "Option::is_none")]
260    pub metadata_replay_entry_id: Option<u64>,
261}
262
263impl UpgradeRegion {
264    /// Sets the replay entry id.
265    pub fn with_replay_entry_id(mut self, replay_entry_id: Option<u64>) -> Self {
266        self.replay_entry_id = replay_entry_id;
267        self
268    }
269
270    /// Sets the metadata replay entry id.
271    pub fn with_metadata_replay_entry_id(mut self, metadata_replay_entry_id: Option<u64>) -> Self {
272        self.metadata_replay_entry_id = metadata_replay_entry_id;
273        self
274    }
275}
276
277#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
278/// The identifier of cache.
279pub enum CacheIdent {
280    FlowId(FlowId),
281    /// Indicate change of address of flownode.
282    FlowNodeAddressChange(u64),
283    FlowName(FlowName),
284    TableId(TableId),
285    TableName(TableName),
286    SchemaName(SchemaName),
287    CreateFlow(CreateFlow),
288    DropFlow(DropFlow),
289}
290
291#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
292pub struct CreateFlow {
293    /// The unique identifier for the flow.
294    pub flow_id: FlowId,
295    pub source_table_ids: Vec<TableId>,
296    /// Mapping of flow partition to peer information
297    pub partition_to_peer_mapping: Vec<(FlowPartitionId, Peer)>,
298}
299
300#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
301pub struct DropFlow {
302    pub flow_id: FlowId,
303    pub source_table_ids: Vec<TableId>,
304    /// Mapping of flow partition to flownode id
305    pub flow_part2node_id: Vec<(FlowPartitionId, FlownodeId)>,
306}
307
308/// Strategy for executing flush operations.
309#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
310pub enum FlushStrategy {
311    /// Synchronous operation that waits for completion and expects a reply
312    #[default]
313    Sync,
314    /// Asynchronous hint operation (fire-and-forget, no reply expected)
315    Async,
316}
317
318/// Error handling strategy for batch flush operations.
319#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
320pub enum FlushErrorStrategy {
321    /// Abort on first error (fail-fast)
322    #[default]
323    FailFast,
324    /// Attempt to flush all regions and collect all errors
325    TryAll,
326}
327
328/// Unified flush instruction supporting both single and batch operations
329/// with configurable execution strategies and error handling.
330#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
331pub struct FlushRegions {
332    /// List of region IDs to flush. Can contain a single region or multiple regions.
333    pub region_ids: Vec<RegionId>,
334    /// Execution strategy: Sync (expects reply) or Async (fire-and-forget hint).
335    #[serde(default)]
336    pub strategy: FlushStrategy,
337    /// Error handling strategy for batch operations (only applies when multiple regions and sync strategy).
338    #[serde(default)]
339    pub error_strategy: FlushErrorStrategy,
340}
341
342impl Display for FlushRegions {
343    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
344        write!(
345            f,
346            "FlushRegions(region_ids={:?}, strategy={:?}, error_strategy={:?})",
347            self.region_ids, self.strategy, self.error_strategy
348        )
349    }
350}
351
352impl FlushRegions {
353    /// Create synchronous single-region flush
354    pub fn sync_single(region_id: RegionId) -> Self {
355        Self {
356            region_ids: vec![region_id],
357            strategy: FlushStrategy::Sync,
358            error_strategy: FlushErrorStrategy::FailFast,
359        }
360    }
361
362    /// Create asynchronous batch flush (fire-and-forget)
363    pub fn async_batch(region_ids: Vec<RegionId>) -> Self {
364        Self {
365            region_ids,
366            strategy: FlushStrategy::Async,
367            error_strategy: FlushErrorStrategy::TryAll,
368        }
369    }
370
371    /// Create synchronous batch flush with error strategy
372    pub fn sync_batch(region_ids: Vec<RegionId>, error_strategy: FlushErrorStrategy) -> Self {
373        Self {
374            region_ids,
375            strategy: FlushStrategy::Sync,
376            error_strategy,
377        }
378    }
379
380    /// Check if this is a single region flush.
381    pub fn is_single_region(&self) -> bool {
382        self.region_ids.len() == 1
383    }
384
385    /// Get the single region ID if this is a single region flush.
386    pub fn single_region_id(&self) -> Option<RegionId> {
387        if self.is_single_region() {
388            self.region_ids.first().copied()
389        } else {
390            None
391        }
392    }
393
394    /// Check if this is a hint (asynchronous) operation.
395    pub fn is_hint(&self) -> bool {
396        matches!(self.strategy, FlushStrategy::Async)
397    }
398
399    /// Check if this is a synchronous operation.
400    pub fn is_sync(&self) -> bool {
401        matches!(self.strategy, FlushStrategy::Sync)
402    }
403}
404
405impl From<RegionId> for FlushRegions {
406    fn from(region_id: RegionId) -> Self {
407        Self::sync_single(region_id)
408    }
409}
410
411#[derive(Debug, Deserialize)]
412#[serde(untagged)]
413enum SingleOrMultiple<T> {
414    Single(T),
415    Multiple(Vec<T>),
416}
417
418fn single_or_multiple_from<'de, D, T>(deserializer: D) -> Result<Vec<T>, D::Error>
419where
420    D: Deserializer<'de>,
421    T: Deserialize<'de>,
422{
423    let helper = SingleOrMultiple::<T>::deserialize(deserializer)?;
424    Ok(match helper {
425        SingleOrMultiple::Single(x) => vec![x],
426        SingleOrMultiple::Multiple(xs) => xs,
427    })
428}
429
430/// Instruction to get file references for specified regions.
431#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
432pub struct GetFileRefs {
433    /// List of region IDs to get file references from active FileHandles (in-memory).
434    pub query_regions: Vec<RegionId>,
435    /// Mapping from the source region ID (where to read the manifest) to
436    /// the target region IDs (whose file references to look for).
437    /// Key: The region ID of the manifest.
438    /// Value: The list of region IDs to find references for in that manifest.
439    pub related_regions: HashMap<RegionId, Vec<RegionId>>,
440}
441
442impl Display for GetFileRefs {
443    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
444        write!(f, "GetFileRefs(region_ids={:?})", self.query_regions)
445    }
446}
447
448/// Instruction to trigger garbage collection for a region.
449#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
450pub struct GcRegions {
451    /// The region ID to perform GC on, only regions that are currently on the given datanode can be garbage collected, regions not on the datanode will report errors.
452    pub regions: Vec<RegionId>,
453    /// The file references manifest containing temporary file references.
454    pub file_refs_manifest: FileRefsManifest,
455    /// Whether to perform a full file listing to find orphan files.
456    pub full_file_listing: bool,
457}
458
459impl Display for GcRegions {
460    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
461        write!(
462            f,
463            "GcRegion(regions={:?}, file_refs_count={}, full_file_listing={})",
464            self.regions,
465            self.file_refs_manifest.file_refs.len(),
466            self.full_file_listing
467        )
468    }
469}
470
471/// Reply for GetFileRefs instruction.
472#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
473pub struct GetFileRefsReply {
474    /// The file references manifest.
475    pub file_refs_manifest: FileRefsManifest,
476    /// Whether the operation was successful.
477    pub success: bool,
478    /// Error message if any.
479    pub error: Option<String>,
480}
481
482impl Display for GetFileRefsReply {
483    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
484        write!(
485            f,
486            "GetFileRefsReply(success={}, file_refs_count={}, error={:?})",
487            self.success,
488            self.file_refs_manifest.file_refs.len(),
489            self.error
490        )
491    }
492}
493
494/// Reply for GC instruction.
495#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
496pub struct GcRegionsReply {
497    pub result: Result<GcReport, String>,
498}
499
500impl Display for GcRegionsReply {
501    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
502        write!(
503            f,
504            "GcReply(result={})",
505            match &self.result {
506                Ok(report) => format!(
507                    "GcReport(deleted_files_count={}, need_retry_regions_count={})",
508                    report.deleted_files.len(),
509                    report.need_retry_regions.len()
510                ),
511                Err(err) => format!("Err({})", err),
512            }
513        )
514    }
515}
516
517#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
518pub struct EnterStagingRegion {
519    pub region_id: RegionId,
520    pub partition_expr: String,
521}
522
523impl Display for EnterStagingRegion {
524    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
525        write!(
526            f,
527            "EnterStagingRegion(region_id={}, partition_expr={})",
528            self.region_id, self.partition_expr
529        )
530    }
531}
532
533#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
534pub struct RemapManifest {
535    pub region_id: RegionId,
536    /// Regions to remap manifests from.
537    pub input_regions: Vec<RegionId>,
538    /// For each old region, which new regions should receive its files
539    pub region_mapping: HashMap<RegionId, Vec<RegionId>>,
540    /// New partition expressions for the new regions.
541    pub new_partition_exprs: HashMap<RegionId, String>,
542}
543
544impl Display for RemapManifest {
545    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
546        write!(
547            f,
548            "RemapManifest(region_id={}, input_regions={:?}, region_mapping={:?}, new_partition_exprs={:?})",
549            self.region_id, self.input_regions, self.region_mapping, self.new_partition_exprs
550        )
551    }
552}
553
554#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
555pub struct ApplyStagingManifest {
556    /// The region ID to apply the staging manifest to.
557    pub region_id: RegionId,
558    /// The partition expression of the staging region.
559    pub partition_expr: String,
560    /// The region that stores the staging manifests in its staging blob storage.
561    pub central_region_id: RegionId,
562    /// The relative path to the staging manifest within the central region's staging blob storage.
563    pub manifest_path: String,
564}
565
566impl Display for ApplyStagingManifest {
567    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
568        write!(
569            f,
570            "ApplyStagingManifest(region_id={}, partition_expr={}, central_region_id={}, manifest_path={})",
571            self.region_id, self.partition_expr, self.central_region_id, self.manifest_path
572        )
573    }
574}
575
576#[derive(Debug, Clone, Serialize, Deserialize, Display, PartialEq)]
577pub enum Instruction {
578    /// Opens regions.
579    #[serde(deserialize_with = "single_or_multiple_from", alias = "OpenRegion")]
580    OpenRegions(Vec<OpenRegion>),
581    /// Closes regions.
582    #[serde(deserialize_with = "single_or_multiple_from", alias = "CloseRegion")]
583    CloseRegions(Vec<RegionIdent>),
584    /// Upgrades regions.
585    #[serde(deserialize_with = "single_or_multiple_from", alias = "UpgradeRegion")]
586    UpgradeRegions(Vec<UpgradeRegion>),
587    #[serde(
588        deserialize_with = "single_or_multiple_from",
589        alias = "DowngradeRegion"
590    )]
591    /// Downgrades regions.
592    DowngradeRegions(Vec<DowngradeRegion>),
593    /// Invalidates batch cache.
594    InvalidateCaches(Vec<CacheIdent>),
595    /// Flushes regions.
596    FlushRegions(FlushRegions),
597    /// Gets file references for regions.
598    GetFileRefs(GetFileRefs),
599    /// Triggers garbage collection for a region.
600    GcRegions(GcRegions),
601    /// Temporary suspend serving reads or writes
602    Suspend,
603    /// Makes regions enter staging state.
604    EnterStagingRegions(Vec<EnterStagingRegion>),
605    /// Remaps manifests for a region.
606    RemapManifest(RemapManifest),
607    /// Applies staging manifests for a region.
608    ApplyStagingManifests(Vec<ApplyStagingManifest>),
609}
610
611impl Instruction {
612    /// Converts the instruction into a vector of [OpenRegion].
613    pub fn into_open_regions(self) -> Option<Vec<OpenRegion>> {
614        match self {
615            Self::OpenRegions(open_regions) => Some(open_regions),
616            _ => None,
617        }
618    }
619
620    /// Converts the instruction into a vector of [RegionIdent].
621    pub fn into_close_regions(self) -> Option<Vec<RegionIdent>> {
622        match self {
623            Self::CloseRegions(close_regions) => Some(close_regions),
624            _ => None,
625        }
626    }
627
628    /// Converts the instruction into a [FlushRegions].
629    pub fn into_flush_regions(self) -> Option<FlushRegions> {
630        match self {
631            Self::FlushRegions(flush_regions) => Some(flush_regions),
632            _ => None,
633        }
634    }
635
636    /// Converts the instruction into a [DowngradeRegion].
637    pub fn into_downgrade_regions(self) -> Option<Vec<DowngradeRegion>> {
638        match self {
639            Self::DowngradeRegions(downgrade_region) => Some(downgrade_region),
640            _ => None,
641        }
642    }
643
644    /// Converts the instruction into a [UpgradeRegion].
645    pub fn into_upgrade_regions(self) -> Option<Vec<UpgradeRegion>> {
646        match self {
647            Self::UpgradeRegions(upgrade_region) => Some(upgrade_region),
648            _ => None,
649        }
650    }
651
652    pub fn into_get_file_refs(self) -> Option<GetFileRefs> {
653        match self {
654            Self::GetFileRefs(get_file_refs) => Some(get_file_refs),
655            _ => None,
656        }
657    }
658
659    pub fn into_gc_regions(self) -> Option<GcRegions> {
660        match self {
661            Self::GcRegions(gc_regions) => Some(gc_regions),
662            _ => None,
663        }
664    }
665
666    pub fn into_enter_staging_regions(self) -> Option<Vec<EnterStagingRegion>> {
667        match self {
668            Self::EnterStagingRegions(enter_staging) => Some(enter_staging),
669            _ => None,
670        }
671    }
672}
673
674/// The reply of [UpgradeRegion].
675#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
676pub struct UpgradeRegionReply {
677    /// The [RegionId].
678    /// For compatibility, it is defaulted to [RegionId::new(0, 0)].
679    #[serde(default)]
680    pub region_id: RegionId,
681    /// Returns true if `last_entry_id` has been replayed to the latest.
682    pub ready: bool,
683    /// Indicates whether the region exists.
684    pub exists: bool,
685    /// Returns error if any.
686    pub error: Option<String>,
687}
688
689impl Display for UpgradeRegionReply {
690    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
691        write!(
692            f,
693            "(ready={}, exists={}, error={:?})",
694            self.ready, self.exists, self.error
695        )
696    }
697}
698
699#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
700pub struct DowngradeRegionsReply {
701    pub replies: Vec<DowngradeRegionReply>,
702}
703
704impl DowngradeRegionsReply {
705    pub fn new(replies: Vec<DowngradeRegionReply>) -> Self {
706        Self { replies }
707    }
708
709    pub fn single(reply: DowngradeRegionReply) -> Self {
710        Self::new(vec![reply])
711    }
712}
713
714#[derive(Deserialize)]
715#[serde(untagged)]
716enum DowngradeRegionsCompat {
717    Single(DowngradeRegionReply),
718    Multiple(DowngradeRegionsReply),
719}
720
721fn downgrade_regions_compat_from<'de, D>(deserializer: D) -> Result<DowngradeRegionsReply, D::Error>
722where
723    D: Deserializer<'de>,
724{
725    let helper = DowngradeRegionsCompat::deserialize(deserializer)?;
726    Ok(match helper {
727        DowngradeRegionsCompat::Single(x) => DowngradeRegionsReply::new(vec![x]),
728        DowngradeRegionsCompat::Multiple(reply) => reply,
729    })
730}
731
732#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
733pub struct UpgradeRegionsReply {
734    pub replies: Vec<UpgradeRegionReply>,
735}
736
737impl UpgradeRegionsReply {
738    pub fn new(replies: Vec<UpgradeRegionReply>) -> Self {
739        Self { replies }
740    }
741
742    pub fn single(reply: UpgradeRegionReply) -> Self {
743        Self::new(vec![reply])
744    }
745}
746
747#[derive(Deserialize)]
748#[serde(untagged)]
749enum UpgradeRegionsCompat {
750    Single(UpgradeRegionReply),
751    Multiple(UpgradeRegionsReply),
752}
753
754fn upgrade_regions_compat_from<'de, D>(deserializer: D) -> Result<UpgradeRegionsReply, D::Error>
755where
756    D: Deserializer<'de>,
757{
758    let helper = UpgradeRegionsCompat::deserialize(deserializer)?;
759    Ok(match helper {
760        UpgradeRegionsCompat::Single(x) => UpgradeRegionsReply::new(vec![x]),
761        UpgradeRegionsCompat::Multiple(reply) => reply,
762    })
763}
764
765#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
766pub struct EnterStagingRegionReply {
767    pub region_id: RegionId,
768    /// Returns true if the region is under the new region rule.
769    pub ready: bool,
770    /// Indicates whether the region exists.
771    pub exists: bool,
772    /// Return error if any during the operation.
773    pub error: Option<String>,
774}
775
776#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
777pub struct EnterStagingRegionsReply {
778    pub replies: Vec<EnterStagingRegionReply>,
779}
780
781impl EnterStagingRegionsReply {
782    pub fn new(replies: Vec<EnterStagingRegionReply>) -> Self {
783        Self { replies }
784    }
785}
786
787#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
788pub struct RemapManifestReply {
789    /// Returns false if the region does not exist.
790    pub exists: bool,
791    /// A map from region IDs to their corresponding remapped manifest paths.
792    pub manifest_paths: HashMap<RegionId, String>,
793    /// Return error if any during the operation.
794    pub error: Option<String>,
795}
796
797impl Display for RemapManifestReply {
798    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
799        write!(
800            f,
801            "RemapManifestReply(manifest_paths={:?}, error={:?})",
802            self.manifest_paths, self.error
803        )
804    }
805}
806
807#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
808pub struct ApplyStagingManifestsReply {
809    pub replies: Vec<ApplyStagingManifestReply>,
810}
811
812impl ApplyStagingManifestsReply {
813    pub fn new(replies: Vec<ApplyStagingManifestReply>) -> Self {
814        Self { replies }
815    }
816}
817
818#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
819pub struct ApplyStagingManifestReply {
820    pub region_id: RegionId,
821    /// Returns true if the region is ready to serve reads and writes.
822    pub ready: bool,
823    /// Indicates whether the region exists.
824    pub exists: bool,
825    /// Return error if any during the operation.
826    pub error: Option<String>,
827}
828
829#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
830#[serde(tag = "type", rename_all = "snake_case")]
831pub enum InstructionReply {
832    #[serde(alias = "open_region")]
833    OpenRegions(SimpleReply),
834    #[serde(alias = "close_region")]
835    CloseRegions(SimpleReply),
836    #[serde(
837        deserialize_with = "upgrade_regions_compat_from",
838        alias = "upgrade_region"
839    )]
840    UpgradeRegions(UpgradeRegionsReply),
841    #[serde(
842        alias = "downgrade_region",
843        deserialize_with = "downgrade_regions_compat_from"
844    )]
845    DowngradeRegions(DowngradeRegionsReply),
846    FlushRegions(FlushRegionReply),
847    GetFileRefs(GetFileRefsReply),
848    GcRegions(GcRegionsReply),
849    EnterStagingRegions(EnterStagingRegionsReply),
850    RemapManifest(RemapManifestReply),
851    ApplyStagingManifests(ApplyStagingManifestsReply),
852}
853
854impl Display for InstructionReply {
855    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
856        match self {
857            Self::OpenRegions(reply) => write!(f, "InstructionReply::OpenRegions({})", reply),
858            Self::CloseRegions(reply) => write!(f, "InstructionReply::CloseRegions({})", reply),
859            Self::UpgradeRegions(reply) => {
860                write!(f, "InstructionReply::UpgradeRegions({:?})", reply.replies)
861            }
862            Self::DowngradeRegions(reply) => {
863                write!(f, "InstructionReply::DowngradeRegions({:?})", reply.replies)
864            }
865            Self::FlushRegions(reply) => write!(f, "InstructionReply::FlushRegions({})", reply),
866            Self::GetFileRefs(reply) => write!(f, "InstructionReply::GetFileRefs({})", reply),
867            Self::GcRegions(reply) => write!(f, "InstructionReply::GcRegion({})", reply),
868            Self::EnterStagingRegions(reply) => {
869                write!(
870                    f,
871                    "InstructionReply::EnterStagingRegions({:?})",
872                    reply.replies
873                )
874            }
875            Self::RemapManifest(reply) => write!(f, "InstructionReply::RemapManifest({})", reply),
876            Self::ApplyStagingManifests(reply) => write!(
877                f,
878                "InstructionReply::ApplyStagingManifests({:?})",
879                reply.replies
880            ),
881        }
882    }
883}
884
885#[cfg(any(test, feature = "testing"))]
886impl InstructionReply {
887    pub fn expect_close_regions_reply(self) -> SimpleReply {
888        match self {
889            Self::CloseRegions(reply) => reply,
890            _ => panic!("Expected CloseRegions reply"),
891        }
892    }
893
894    pub fn expect_open_regions_reply(self) -> SimpleReply {
895        match self {
896            Self::OpenRegions(reply) => reply,
897            _ => panic!("Expected OpenRegions reply"),
898        }
899    }
900
901    pub fn expect_upgrade_regions_reply(self) -> Vec<UpgradeRegionReply> {
902        match self {
903            Self::UpgradeRegions(reply) => reply.replies,
904            _ => panic!("Expected UpgradeRegion reply"),
905        }
906    }
907
908    pub fn expect_downgrade_regions_reply(self) -> Vec<DowngradeRegionReply> {
909        match self {
910            Self::DowngradeRegions(reply) => reply.replies,
911            _ => panic!("Expected DowngradeRegion reply"),
912        }
913    }
914
915    pub fn expect_flush_regions_reply(self) -> FlushRegionReply {
916        match self {
917            Self::FlushRegions(reply) => reply,
918            _ => panic!("Expected FlushRegions reply"),
919        }
920    }
921
922    pub fn expect_enter_staging_regions_reply(self) -> Vec<EnterStagingRegionReply> {
923        match self {
924            Self::EnterStagingRegions(reply) => reply.replies,
925            _ => panic!("Expected EnterStagingRegion reply"),
926        }
927    }
928
929    pub fn expect_remap_manifest_reply(self) -> RemapManifestReply {
930        match self {
931            Self::RemapManifest(reply) => reply,
932            _ => panic!("Expected RemapManifest reply"),
933        }
934    }
935
936    pub fn expect_apply_staging_manifests_reply(self) -> Vec<ApplyStagingManifestReply> {
937        match self {
938            Self::ApplyStagingManifests(reply) => reply.replies,
939            _ => panic!("Expected ApplyStagingManifest reply"),
940        }
941    }
942}
943
944#[cfg(test)]
945mod tests {
946    use std::collections::HashSet;
947
948    use store_api::storage::{FileId, FileRef};
949
950    use super::*;
951
952    #[test]
953    fn test_serialize_instruction() {
954        let open_region = Instruction::OpenRegions(vec![OpenRegion::new(
955            RegionIdent {
956                datanode_id: 2,
957                table_id: 1024,
958                region_number: 1,
959                engine: "mito2".to_string(),
960            },
961            "test/foo",
962            HashMap::new(),
963            HashMap::new(),
964            false,
965        )]);
966
967        let serialized = serde_json::to_string(&open_region).unwrap();
968        assert_eq!(
969            r#"{"OpenRegions":[{"region_ident":{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"},"region_storage_path":"test/foo","region_options":{},"region_wal_options":{},"skip_wal_replay":false}]}"#,
970            serialized
971        );
972
973        let close_region = Instruction::CloseRegions(vec![RegionIdent {
974            datanode_id: 2,
975            table_id: 1024,
976            region_number: 1,
977            engine: "mito2".to_string(),
978        }]);
979
980        let serialized = serde_json::to_string(&close_region).unwrap();
981        assert_eq!(
982            r#"{"CloseRegions":[{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"}]}"#,
983            serialized
984        );
985
986        let upgrade_region = Instruction::UpgradeRegions(vec![UpgradeRegion {
987            region_id: RegionId::new(1024, 1),
988            last_entry_id: None,
989            metadata_last_entry_id: None,
990            replay_timeout: Duration::from_millis(1000),
991            location_id: None,
992            replay_entry_id: None,
993            metadata_replay_entry_id: None,
994        }]);
995
996        let serialized = serde_json::to_string(&upgrade_region).unwrap();
997        assert_eq!(
998            r#"{"UpgradeRegions":[{"region_id":4398046511105,"last_entry_id":null,"metadata_last_entry_id":null,"replay_timeout":"1s","location_id":null}]}"#,
999            serialized
1000        );
1001    }
1002
1003    #[test]
1004    fn test_serialize_instruction_reply() {
1005        let downgrade_region_reply = InstructionReply::DowngradeRegions(
1006            DowngradeRegionsReply::single(DowngradeRegionReply {
1007                region_id: RegionId::new(1024, 1),
1008                last_entry_id: None,
1009                metadata_last_entry_id: None,
1010                exists: true,
1011                error: None,
1012            }),
1013        );
1014
1015        let serialized = serde_json::to_string(&downgrade_region_reply).unwrap();
1016        assert_eq!(
1017            r#"{"type":"downgrade_regions","replies":[{"region_id":4398046511105,"last_entry_id":null,"metadata_last_entry_id":null,"exists":true,"error":null}]}"#,
1018            serialized
1019        );
1020
1021        let upgrade_region_reply =
1022            InstructionReply::UpgradeRegions(UpgradeRegionsReply::single(UpgradeRegionReply {
1023                region_id: RegionId::new(1024, 1),
1024                ready: true,
1025                exists: true,
1026                error: None,
1027            }));
1028        let serialized = serde_json::to_string(&upgrade_region_reply).unwrap();
1029        assert_eq!(
1030            r#"{"type":"upgrade_regions","replies":[{"region_id":4398046511105,"ready":true,"exists":true,"error":null}]}"#,
1031            serialized
1032        );
1033    }
1034
1035    #[test]
1036    fn test_deserialize_instruction() {
1037        // legacy open region instruction
1038        let open_region_instruction = r#"{"OpenRegion":{"region_ident":{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"},"region_storage_path":"test/foo","region_options":{},"region_wal_options":{},"skip_wal_replay":false}}"#;
1039        let open_region_instruction: Instruction =
1040            serde_json::from_str(open_region_instruction).unwrap();
1041        let open_region = Instruction::OpenRegions(vec![OpenRegion::new(
1042            RegionIdent {
1043                datanode_id: 2,
1044                table_id: 1024,
1045                region_number: 1,
1046                engine: "mito2".to_string(),
1047            },
1048            "test/foo",
1049            HashMap::new(),
1050            HashMap::new(),
1051            false,
1052        )]);
1053        assert_eq!(open_region_instruction, open_region);
1054
1055        // legacy close region instruction
1056        let close_region_instruction = r#"{"CloseRegion":{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"}}"#;
1057        let close_region_instruction: Instruction =
1058            serde_json::from_str(close_region_instruction).unwrap();
1059        let close_region = Instruction::CloseRegions(vec![RegionIdent {
1060            datanode_id: 2,
1061            table_id: 1024,
1062            region_number: 1,
1063            engine: "mito2".to_string(),
1064        }]);
1065        assert_eq!(close_region_instruction, close_region);
1066
1067        // legacy downgrade region instruction
1068        let downgrade_region_instruction = r#"{"DowngradeRegions":{"region_id":4398046511105,"flush_timeout":{"secs":1,"nanos":0}}}"#;
1069        let downgrade_region_instruction: Instruction =
1070            serde_json::from_str(downgrade_region_instruction).unwrap();
1071        let downgrade_region = Instruction::DowngradeRegions(vec![DowngradeRegion {
1072            region_id: RegionId::new(1024, 1),
1073            flush_timeout: Some(Duration::from_millis(1000)),
1074        }]);
1075        assert_eq!(downgrade_region_instruction, downgrade_region);
1076
1077        // legacy upgrade region instruction
1078        let upgrade_region_instruction = r#"{"UpgradeRegion":{"region_id":4398046511105,"last_entry_id":null,"metadata_last_entry_id":null,"replay_timeout":"1s","location_id":null,"replay_entry_id":null,"metadata_replay_entry_id":null}}"#;
1079        let upgrade_region_instruction: Instruction =
1080            serde_json::from_str(upgrade_region_instruction).unwrap();
1081        let upgrade_region = Instruction::UpgradeRegions(vec![UpgradeRegion {
1082            region_id: RegionId::new(1024, 1),
1083            last_entry_id: None,
1084            metadata_last_entry_id: None,
1085            replay_timeout: Duration::from_millis(1000),
1086            location_id: None,
1087            replay_entry_id: None,
1088            metadata_replay_entry_id: None,
1089        }]);
1090        assert_eq!(upgrade_region_instruction, upgrade_region);
1091    }
1092
1093    #[test]
1094    fn test_deserialize_instruction_reply() {
1095        // legacy close region reply
1096        let close_region_instruction_reply =
1097            r#"{"result":true,"error":null,"type":"close_region"}"#;
1098        let close_region_instruction_reply: InstructionReply =
1099            serde_json::from_str(close_region_instruction_reply).unwrap();
1100        let close_region_reply = InstructionReply::CloseRegions(SimpleReply {
1101            result: true,
1102            error: None,
1103        });
1104        assert_eq!(close_region_instruction_reply, close_region_reply);
1105
1106        // legacy open region reply
1107        let open_region_instruction_reply = r#"{"result":true,"error":null,"type":"open_region"}"#;
1108        let open_region_instruction_reply: InstructionReply =
1109            serde_json::from_str(open_region_instruction_reply).unwrap();
1110        let open_region_reply = InstructionReply::OpenRegions(SimpleReply {
1111            result: true,
1112            error: None,
1113        });
1114        assert_eq!(open_region_instruction_reply, open_region_reply);
1115
1116        // legacy downgrade region reply
1117        let downgrade_region_instruction_reply = r#"{"region_id":4398046511105,"last_entry_id":null,"metadata_last_entry_id":null,"exists":true,"error":null,"type":"downgrade_region"}"#;
1118        let downgrade_region_instruction_reply: InstructionReply =
1119            serde_json::from_str(downgrade_region_instruction_reply).unwrap();
1120        let downgrade_region_reply = InstructionReply::DowngradeRegions(
1121            DowngradeRegionsReply::single(DowngradeRegionReply {
1122                region_id: RegionId::new(1024, 1),
1123                last_entry_id: None,
1124                metadata_last_entry_id: None,
1125                exists: true,
1126                error: None,
1127            }),
1128        );
1129        assert_eq!(downgrade_region_instruction_reply, downgrade_region_reply);
1130
1131        // legacy upgrade region reply
1132        let upgrade_region_instruction_reply = r#"{"region_id":4398046511105,"ready":true,"exists":true,"error":null,"type":"upgrade_region"}"#;
1133        let upgrade_region_instruction_reply: InstructionReply =
1134            serde_json::from_str(upgrade_region_instruction_reply).unwrap();
1135        let upgrade_region_reply =
1136            InstructionReply::UpgradeRegions(UpgradeRegionsReply::single(UpgradeRegionReply {
1137                region_id: RegionId::new(1024, 1),
1138                ready: true,
1139                exists: true,
1140                error: None,
1141            }));
1142        assert_eq!(upgrade_region_instruction_reply, upgrade_region_reply);
1143    }
1144
1145    #[derive(Debug, Clone, Serialize, Deserialize)]
1146    struct LegacyOpenRegion {
1147        region_ident: RegionIdent,
1148        region_storage_path: String,
1149        region_options: HashMap<String, String>,
1150    }
1151
1152    #[test]
1153    fn test_compatible_serialize_open_region() {
1154        let region_ident = RegionIdent {
1155            datanode_id: 2,
1156            table_id: 1024,
1157            region_number: 1,
1158            engine: "mito2".to_string(),
1159        };
1160        let region_storage_path = "test/foo".to_string();
1161        let region_options = HashMap::from([
1162            ("a".to_string(), "aa".to_string()),
1163            ("b".to_string(), "bb".to_string()),
1164        ]);
1165
1166        // Serialize a legacy OpenRegion.
1167        let legacy_open_region = LegacyOpenRegion {
1168            region_ident: region_ident.clone(),
1169            region_storage_path: region_storage_path.clone(),
1170            region_options: region_options.clone(),
1171        };
1172        let serialized = serde_json::to_string(&legacy_open_region).unwrap();
1173
1174        // Deserialize to OpenRegion.
1175        let deserialized = serde_json::from_str(&serialized).unwrap();
1176        let expected = OpenRegion {
1177            region_ident,
1178            region_storage_path,
1179            region_options,
1180            region_wal_options: HashMap::new(),
1181            skip_wal_replay: false,
1182        };
1183        assert_eq!(expected, deserialized);
1184    }
1185
1186    #[test]
1187    fn test_flush_regions_creation() {
1188        let region_id = RegionId::new(1024, 1);
1189
1190        // Single region sync flush
1191        let single_sync = FlushRegions::sync_single(region_id);
1192        assert_eq!(single_sync.region_ids, vec![region_id]);
1193        assert_eq!(single_sync.strategy, FlushStrategy::Sync);
1194        assert!(!single_sync.is_hint());
1195        assert!(single_sync.is_sync());
1196        assert_eq!(single_sync.error_strategy, FlushErrorStrategy::FailFast);
1197        assert!(single_sync.is_single_region());
1198        assert_eq!(single_sync.single_region_id(), Some(region_id));
1199
1200        // Batch async flush (hint)
1201        let region_ids = vec![RegionId::new(1024, 1), RegionId::new(1024, 2)];
1202        let batch_async = FlushRegions::async_batch(region_ids.clone());
1203        assert_eq!(batch_async.region_ids, region_ids);
1204        assert_eq!(batch_async.strategy, FlushStrategy::Async);
1205        assert!(batch_async.is_hint());
1206        assert!(!batch_async.is_sync());
1207        assert_eq!(batch_async.error_strategy, FlushErrorStrategy::TryAll);
1208        assert!(!batch_async.is_single_region());
1209        assert_eq!(batch_async.single_region_id(), None);
1210
1211        // Batch sync flush
1212        let batch_sync = FlushRegions::sync_batch(region_ids.clone(), FlushErrorStrategy::FailFast);
1213        assert_eq!(batch_sync.region_ids, region_ids);
1214        assert_eq!(batch_sync.strategy, FlushStrategy::Sync);
1215        assert!(!batch_sync.is_hint());
1216        assert!(batch_sync.is_sync());
1217        assert_eq!(batch_sync.error_strategy, FlushErrorStrategy::FailFast);
1218    }
1219
1220    #[test]
1221    fn test_flush_regions_conversion() {
1222        let region_id = RegionId::new(1024, 1);
1223
1224        let from_region_id: FlushRegions = region_id.into();
1225        assert_eq!(from_region_id.region_ids, vec![region_id]);
1226        assert_eq!(from_region_id.strategy, FlushStrategy::Sync);
1227        assert!(!from_region_id.is_hint());
1228        assert!(from_region_id.is_sync());
1229
1230        // Test default construction
1231        let flush_regions = FlushRegions {
1232            region_ids: vec![region_id],
1233            strategy: FlushStrategy::Async,
1234            error_strategy: FlushErrorStrategy::TryAll,
1235        };
1236        assert_eq!(flush_regions.region_ids, vec![region_id]);
1237        assert_eq!(flush_regions.strategy, FlushStrategy::Async);
1238        assert!(flush_regions.is_hint());
1239        assert!(!flush_regions.is_sync());
1240    }
1241
1242    #[test]
1243    fn test_flush_region_reply() {
1244        let region_id = RegionId::new(1024, 1);
1245
1246        // Successful single region reply
1247        let success_reply = FlushRegionReply::success_single(region_id);
1248        assert!(success_reply.overall_success);
1249        assert_eq!(success_reply.results.len(), 1);
1250        assert_eq!(success_reply.results[0].0, region_id);
1251        assert!(success_reply.results[0].1.is_ok());
1252
1253        // Failed single region reply
1254        let error_reply = FlushRegionReply::error_single(region_id, "test error".to_string());
1255        assert!(!error_reply.overall_success);
1256        assert_eq!(error_reply.results.len(), 1);
1257        assert_eq!(error_reply.results[0].0, region_id);
1258        assert!(error_reply.results[0].1.is_err());
1259
1260        // Batch reply
1261        let region_id2 = RegionId::new(1024, 2);
1262        let results = vec![
1263            (region_id, Ok(())),
1264            (region_id2, Err("flush failed".to_string())),
1265        ];
1266        let batch_reply = FlushRegionReply::from_results(results);
1267        assert!(!batch_reply.overall_success);
1268        assert_eq!(batch_reply.results.len(), 2);
1269
1270        // Conversion to SimpleReply
1271        let simple_reply = batch_reply.to_simple_reply();
1272        assert!(!simple_reply.result);
1273        assert!(simple_reply.error.is_some());
1274        assert!(simple_reply.error.unwrap().contains("flush failed"));
1275    }
1276
1277    #[test]
1278    fn test_serialize_flush_regions_instruction() {
1279        let region_id = RegionId::new(1024, 1);
1280        let flush_regions = FlushRegions::sync_single(region_id);
1281        let instruction = Instruction::FlushRegions(flush_regions.clone());
1282
1283        let serialized = serde_json::to_string(&instruction).unwrap();
1284        let deserialized: Instruction = serde_json::from_str(&serialized).unwrap();
1285
1286        match deserialized {
1287            Instruction::FlushRegions(fr) => {
1288                assert_eq!(fr.region_ids, vec![region_id]);
1289                assert_eq!(fr.strategy, FlushStrategy::Sync);
1290                assert_eq!(fr.error_strategy, FlushErrorStrategy::FailFast);
1291            }
1292            _ => panic!("Expected FlushRegions instruction"),
1293        }
1294    }
1295
1296    #[test]
1297    fn test_serialize_flush_regions_batch_instruction() {
1298        let region_ids = vec![RegionId::new(1024, 1), RegionId::new(1024, 2)];
1299        let flush_regions =
1300            FlushRegions::sync_batch(region_ids.clone(), FlushErrorStrategy::TryAll);
1301        let instruction = Instruction::FlushRegions(flush_regions);
1302
1303        let serialized = serde_json::to_string(&instruction).unwrap();
1304        let deserialized: Instruction = serde_json::from_str(&serialized).unwrap();
1305
1306        match deserialized {
1307            Instruction::FlushRegions(fr) => {
1308                assert_eq!(fr.region_ids, region_ids);
1309                assert_eq!(fr.strategy, FlushStrategy::Sync);
1310                assert!(!fr.is_hint());
1311                assert!(fr.is_sync());
1312                assert_eq!(fr.error_strategy, FlushErrorStrategy::TryAll);
1313            }
1314            _ => panic!("Expected FlushRegions instruction"),
1315        }
1316    }
1317
1318    #[test]
1319    fn test_serialize_get_file_refs_instruction_reply() {
1320        let mut manifest = FileRefsManifest::default();
1321        let r0 = RegionId::new(1024, 1);
1322        let r1 = RegionId::new(1024, 2);
1323        manifest.file_refs.insert(
1324            r0,
1325            HashSet::from([FileRef::new(r0, FileId::random(), None)]),
1326        );
1327        manifest.file_refs.insert(
1328            r1,
1329            HashSet::from([FileRef::new(r1, FileId::random(), None)]),
1330        );
1331        manifest.manifest_version.insert(r0, 10);
1332        manifest.manifest_version.insert(r1, 20);
1333
1334        let instruction_reply = InstructionReply::GetFileRefs(GetFileRefsReply {
1335            file_refs_manifest: manifest,
1336            success: true,
1337            error: None,
1338        });
1339
1340        let serialized = serde_json::to_string(&instruction_reply).unwrap();
1341        let deserialized = serde_json::from_str(&serialized).unwrap();
1342
1343        assert_eq!(instruction_reply, deserialized);
1344    }
1345}