common_meta/
instruction.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::fmt::{Display, Formatter};
17use std::time::Duration;
18
19use serde::{Deserialize, Deserializer, Serialize};
20use store_api::region_engine::SyncRegionFromRequest;
21use store_api::storage::{FileRefsManifest, GcReport, RegionId, RegionNumber};
22use strum::Display;
23use table::metadata::TableId;
24use table::table_name::TableName;
25
26use crate::flow_name::FlowName;
27use crate::key::schema_name::SchemaName;
28use crate::key::{FlowId, FlowPartitionId};
29use crate::peer::Peer;
30use crate::{DatanodeId, FlownodeId};
31
32#[derive(Eq, Hash, PartialEq, Clone, Debug, Serialize, Deserialize)]
33pub struct RegionIdent {
34    pub datanode_id: DatanodeId,
35    pub table_id: TableId,
36    pub region_number: RegionNumber,
37    pub engine: String,
38}
39
40impl RegionIdent {
41    pub fn get_region_id(&self) -> RegionId {
42        RegionId::new(self.table_id, self.region_number)
43    }
44}
45
46impl Display for RegionIdent {
47    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
48        write!(
49            f,
50            "RegionIdent(datanode_id='{}', table_id={}, region_number={}, engine = {})",
51            self.datanode_id, self.table_id, self.region_number, self.engine
52        )
53    }
54}
55
56/// The result of downgrade leader region.
57#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
58pub struct DowngradeRegionReply {
59    /// The [RegionId].
60    /// For compatibility, it is defaulted to [RegionId::new(0, 0)].
61    #[serde(default)]
62    pub region_id: RegionId,
63    /// Returns the `last_entry_id` if available.
64    pub last_entry_id: Option<u64>,
65    /// Returns the `metadata_last_entry_id` if available (Only available for metric engine).
66    pub metadata_last_entry_id: Option<u64>,
67    /// Indicates whether the region exists.
68    pub exists: bool,
69    /// Return error if any during the operation.
70    pub error: Option<String>,
71}
72
73impl Display for DowngradeRegionReply {
74    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
75        write!(
76            f,
77            "(last_entry_id={:?}, exists={}, error={:?})",
78            self.last_entry_id, self.exists, self.error
79        )
80    }
81}
82
83#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
84pub struct SimpleReply {
85    pub result: bool,
86    pub error: Option<String>,
87}
88
89/// Reply for flush region operations with support for batch results.
90#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
91pub struct FlushRegionReply {
92    /// Results for each region that was attempted to be flushed.
93    /// For single region flushes, this will contain one result.
94    /// For batch flushes, this contains results for all attempted regions.
95    pub results: Vec<(RegionId, Result<(), String>)>,
96    /// Overall success: true if all regions were flushed successfully.
97    pub overall_success: bool,
98}
99
100impl FlushRegionReply {
101    /// Create a successful single region reply.
102    pub fn success_single(region_id: RegionId) -> Self {
103        Self {
104            results: vec![(region_id, Ok(()))],
105            overall_success: true,
106        }
107    }
108
109    /// Create a failed single region reply.
110    pub fn error_single(region_id: RegionId, error: String) -> Self {
111        Self {
112            results: vec![(region_id, Err(error))],
113            overall_success: false,
114        }
115    }
116
117    /// Create a batch reply from individual results.
118    pub fn from_results(results: Vec<(RegionId, Result<(), String>)>) -> Self {
119        let overall_success = results.iter().all(|(_, result)| result.is_ok());
120        Self {
121            results,
122            overall_success,
123        }
124    }
125
126    /// Convert to SimpleReply for backward compatibility.
127    pub fn to_simple_reply(&self) -> SimpleReply {
128        if self.overall_success {
129            SimpleReply {
130                result: true,
131                error: None,
132            }
133        } else {
134            let errors: Vec<String> = self
135                .results
136                .iter()
137                .filter_map(|(region_id, result)| {
138                    result
139                        .as_ref()
140                        .err()
141                        .map(|err| format!("{}: {}", region_id, err))
142                })
143                .collect();
144            SimpleReply {
145                result: false,
146                error: Some(errors.join("; ")),
147            }
148        }
149    }
150}
151
152impl Display for SimpleReply {
153    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
154        write!(f, "(result={}, error={:?})", self.result, self.error)
155    }
156}
157
158impl Display for FlushRegionReply {
159    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
160        let results_str = self
161            .results
162            .iter()
163            .map(|(region_id, result)| match result {
164                Ok(()) => format!("{}:OK", region_id),
165                Err(err) => format!("{}:ERR({})", region_id, err),
166            })
167            .collect::<Vec<_>>()
168            .join(", ");
169        write!(
170            f,
171            "(overall_success={}, results=[{}])",
172            self.overall_success, results_str
173        )
174    }
175}
176
177impl Display for OpenRegion {
178    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
179        write!(
180            f,
181            "OpenRegion(region_ident={}, region_storage_path={})",
182            self.region_ident, self.region_storage_path
183        )
184    }
185}
186
187#[serde_with::serde_as]
188#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
189pub struct OpenRegion {
190    pub region_ident: RegionIdent,
191    pub region_storage_path: String,
192    pub region_options: HashMap<String, String>,
193    #[serde(default)]
194    #[serde_as(as = "HashMap<serde_with::DisplayFromStr, _>")]
195    pub region_wal_options: HashMap<RegionNumber, String>,
196    #[serde(default)]
197    pub skip_wal_replay: bool,
198}
199
200impl OpenRegion {
201    pub fn new(
202        region_ident: RegionIdent,
203        path: &str,
204        region_options: HashMap<String, String>,
205        region_wal_options: HashMap<RegionNumber, String>,
206        skip_wal_replay: bool,
207    ) -> Self {
208        Self {
209            region_ident,
210            region_storage_path: path.to_string(),
211            region_options,
212            region_wal_options,
213            skip_wal_replay,
214        }
215    }
216}
217
218/// The instruction of downgrading leader region.
219#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
220pub struct DowngradeRegion {
221    /// The [RegionId].
222    pub region_id: RegionId,
223    /// The timeout of waiting for flush the region.
224    ///
225    /// `None` stands for don't flush before downgrading the region.
226    #[serde(default)]
227    pub flush_timeout: Option<Duration>,
228}
229
230impl Display for DowngradeRegion {
231    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
232        write!(
233            f,
234            "DowngradeRegion(region_id={}, flush_timeout={:?})",
235            self.region_id, self.flush_timeout,
236        )
237    }
238}
239
240/// Upgrades a follower region to leader region.
241#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
242pub struct UpgradeRegion {
243    /// The [RegionId].
244    pub region_id: RegionId,
245    /// The `last_entry_id` of old leader region.
246    pub last_entry_id: Option<u64>,
247    /// The `last_entry_id` of old leader metadata region (Only used for metric engine).
248    pub metadata_last_entry_id: Option<u64>,
249    /// The timeout of waiting for a wal replay.
250    ///
251    /// `None` stands for no wait,
252    /// it's helpful to verify whether the leader region is ready.
253    #[serde(with = "humantime_serde")]
254    pub replay_timeout: Duration,
255    /// The hint for replaying memtable.
256    #[serde(default)]
257    pub location_id: Option<u64>,
258    #[serde(default, skip_serializing_if = "Option::is_none")]
259    pub replay_entry_id: Option<u64>,
260    #[serde(default, skip_serializing_if = "Option::is_none")]
261    pub metadata_replay_entry_id: Option<u64>,
262}
263
264impl UpgradeRegion {
265    /// Sets the replay entry id.
266    pub fn with_replay_entry_id(mut self, replay_entry_id: Option<u64>) -> Self {
267        self.replay_entry_id = replay_entry_id;
268        self
269    }
270
271    /// Sets the metadata replay entry id.
272    pub fn with_metadata_replay_entry_id(mut self, metadata_replay_entry_id: Option<u64>) -> Self {
273        self.metadata_replay_entry_id = metadata_replay_entry_id;
274        self
275    }
276}
277
278#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
279/// The identifier of cache.
280pub enum CacheIdent {
281    FlowId(FlowId),
282    /// Indicate change of address of flownode.
283    FlowNodeAddressChange(u64),
284    FlowName(FlowName),
285    TableId(TableId),
286    TableName(TableName),
287    SchemaName(SchemaName),
288    CreateFlow(CreateFlow),
289    DropFlow(DropFlow),
290}
291
292#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
293pub struct CreateFlow {
294    /// The unique identifier for the flow.
295    pub flow_id: FlowId,
296    pub source_table_ids: Vec<TableId>,
297    /// Mapping of flow partition to peer information
298    pub partition_to_peer_mapping: Vec<(FlowPartitionId, Peer)>,
299}
300
301#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
302pub struct DropFlow {
303    pub flow_id: FlowId,
304    pub source_table_ids: Vec<TableId>,
305    /// Mapping of flow partition to flownode id
306    pub flow_part2node_id: Vec<(FlowPartitionId, FlownodeId)>,
307}
308
309/// Strategy for executing flush operations.
310#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
311pub enum FlushStrategy {
312    /// Synchronous operation that waits for completion and expects a reply
313    #[default]
314    Sync,
315    /// Asynchronous hint operation (fire-and-forget, no reply expected)
316    Async,
317}
318
319/// Error handling strategy for batch flush operations.
320#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
321pub enum FlushErrorStrategy {
322    /// Abort on first error (fail-fast)
323    #[default]
324    FailFast,
325    /// Attempt to flush all regions and collect all errors
326    TryAll,
327}
328
329/// Unified flush instruction supporting both single and batch operations
330/// with configurable execution strategies and error handling.
331#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
332pub struct FlushRegions {
333    /// List of region IDs to flush. Can contain a single region or multiple regions.
334    pub region_ids: Vec<RegionId>,
335    /// Execution strategy: Sync (expects reply) or Async (fire-and-forget hint).
336    #[serde(default)]
337    pub strategy: FlushStrategy,
338    /// Error handling strategy for batch operations (only applies when multiple regions and sync strategy).
339    #[serde(default)]
340    pub error_strategy: FlushErrorStrategy,
341}
342
343impl Display for FlushRegions {
344    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
345        write!(
346            f,
347            "FlushRegions(region_ids={:?}, strategy={:?}, error_strategy={:?})",
348            self.region_ids, self.strategy, self.error_strategy
349        )
350    }
351}
352
353impl FlushRegions {
354    /// Create synchronous single-region flush
355    pub fn sync_single(region_id: RegionId) -> Self {
356        Self {
357            region_ids: vec![region_id],
358            strategy: FlushStrategy::Sync,
359            error_strategy: FlushErrorStrategy::FailFast,
360        }
361    }
362
363    /// Create asynchronous batch flush (fire-and-forget)
364    pub fn async_batch(region_ids: Vec<RegionId>) -> Self {
365        Self {
366            region_ids,
367            strategy: FlushStrategy::Async,
368            error_strategy: FlushErrorStrategy::TryAll,
369        }
370    }
371
372    /// Create synchronous batch flush with error strategy
373    pub fn sync_batch(region_ids: Vec<RegionId>, error_strategy: FlushErrorStrategy) -> Self {
374        Self {
375            region_ids,
376            strategy: FlushStrategy::Sync,
377            error_strategy,
378        }
379    }
380
381    /// Check if this is a single region flush.
382    pub fn is_single_region(&self) -> bool {
383        self.region_ids.len() == 1
384    }
385
386    /// Get the single region ID if this is a single region flush.
387    pub fn single_region_id(&self) -> Option<RegionId> {
388        if self.is_single_region() {
389            self.region_ids.first().copied()
390        } else {
391            None
392        }
393    }
394
395    /// Check if this is a hint (asynchronous) operation.
396    pub fn is_hint(&self) -> bool {
397        matches!(self.strategy, FlushStrategy::Async)
398    }
399
400    /// Check if this is a synchronous operation.
401    pub fn is_sync(&self) -> bool {
402        matches!(self.strategy, FlushStrategy::Sync)
403    }
404}
405
406impl From<RegionId> for FlushRegions {
407    fn from(region_id: RegionId) -> Self {
408        Self::sync_single(region_id)
409    }
410}
411
412#[derive(Debug, Deserialize)]
413#[serde(untagged)]
414enum SingleOrMultiple<T> {
415    Single(T),
416    Multiple(Vec<T>),
417}
418
419fn single_or_multiple_from<'de, D, T>(deserializer: D) -> Result<Vec<T>, D::Error>
420where
421    D: Deserializer<'de>,
422    T: Deserialize<'de>,
423{
424    let helper = SingleOrMultiple::<T>::deserialize(deserializer)?;
425    Ok(match helper {
426        SingleOrMultiple::Single(x) => vec![x],
427        SingleOrMultiple::Multiple(xs) => xs,
428    })
429}
430
431/// Instruction to get file references for specified regions.
432#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
433pub struct GetFileRefs {
434    /// List of region IDs to get file references from active FileHandles (in-memory).
435    pub query_regions: Vec<RegionId>,
436    /// Mapping from the src region IDs (whose file references to look for) to
437    /// the dst region IDs (where to read the manifests).
438    /// Key: The source region IDs (where files originally came from).
439    /// Value: The set of destination region IDs (whose manifests need to be read).
440    pub related_regions: HashMap<RegionId, HashSet<RegionId>>,
441}
442
443impl Display for GetFileRefs {
444    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
445        write!(f, "GetFileRefs(region_ids={:?})", self.query_regions)
446    }
447}
448
449/// Instruction to trigger garbage collection for a region.
450#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
451pub struct GcRegions {
452    /// The region ID to perform GC on, only regions that are currently on the given datanode can be garbage collected, regions not on the datanode will report errors.
453    pub regions: Vec<RegionId>,
454    /// The file references manifest containing temporary file references.
455    pub file_refs_manifest: FileRefsManifest,
456    /// Whether to perform a full file listing to find orphan files.
457    pub full_file_listing: bool,
458}
459
460impl Display for GcRegions {
461    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
462        write!(
463            f,
464            "GcRegion(regions={:?}, file_refs_count={}, full_file_listing={})",
465            self.regions,
466            self.file_refs_manifest.file_refs.len(),
467            self.full_file_listing
468        )
469    }
470}
471
472/// Reply for GetFileRefs instruction.
473#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
474pub struct GetFileRefsReply {
475    /// The file references manifest.
476    pub file_refs_manifest: FileRefsManifest,
477    /// Whether the operation was successful.
478    pub success: bool,
479    /// Error message if any.
480    pub error: Option<String>,
481}
482
483impl Display for GetFileRefsReply {
484    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
485        write!(
486            f,
487            "GetFileRefsReply(success={}, file_refs_count={}, error={:?})",
488            self.success,
489            self.file_refs_manifest.file_refs.len(),
490            self.error
491        )
492    }
493}
494
495/// Reply for GC instruction.
496#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
497pub struct GcRegionsReply {
498    pub result: Result<GcReport, String>,
499}
500
501impl Display for GcRegionsReply {
502    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
503        write!(
504            f,
505            "GcReply(result={})",
506            match &self.result {
507                Ok(report) => format!(
508                    "GcReport(deleted_files_count={}, need_retry_regions_count={})",
509                    report.deleted_files.len(),
510                    report.need_retry_regions.len()
511                ),
512                Err(err) => format!("Err({})", err),
513            }
514        )
515    }
516}
517
518#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
519pub struct EnterStagingRegion {
520    pub region_id: RegionId,
521    pub partition_expr: String,
522}
523
524impl Display for EnterStagingRegion {
525    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
526        write!(
527            f,
528            "EnterStagingRegion(region_id={}, partition_expr={})",
529            self.region_id, self.partition_expr
530        )
531    }
532}
533
534/// Instruction payload for syncing a region from a manifest or another region.
535#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
536pub struct SyncRegion {
537    /// Region id to sync.
538    pub region_id: RegionId,
539    /// Request to sync the region.
540    pub request: SyncRegionFromRequest,
541}
542
543impl Display for SyncRegion {
544    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
545        write!(
546            f,
547            "SyncRegion(region_id={}, request={:?})",
548            self.region_id, self.request
549        )
550    }
551}
552
553#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
554pub struct RemapManifest {
555    pub region_id: RegionId,
556    /// Regions to remap manifests from.
557    pub input_regions: Vec<RegionId>,
558    /// For each old region, which new regions should receive its files
559    pub region_mapping: HashMap<RegionId, Vec<RegionId>>,
560    /// New partition expressions for the new regions.
561    pub new_partition_exprs: HashMap<RegionId, String>,
562}
563
564impl Display for RemapManifest {
565    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
566        write!(
567            f,
568            "RemapManifest(region_id={}, input_regions={:?}, region_mapping={:?}, new_partition_exprs={:?})",
569            self.region_id, self.input_regions, self.region_mapping, self.new_partition_exprs
570        )
571    }
572}
573
574#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
575pub struct ApplyStagingManifest {
576    /// The region ID to apply the staging manifest to.
577    pub region_id: RegionId,
578    /// The partition expression of the staging region.
579    pub partition_expr: String,
580    /// The region that stores the staging manifests in its staging blob storage.
581    pub central_region_id: RegionId,
582    /// The relative path to the staging manifest within the central region's staging blob storage.
583    pub manifest_path: String,
584}
585
586impl Display for ApplyStagingManifest {
587    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
588        write!(
589            f,
590            "ApplyStagingManifest(region_id={}, partition_expr={}, central_region_id={}, manifest_path={})",
591            self.region_id, self.partition_expr, self.central_region_id, self.manifest_path
592        )
593    }
594}
595
596#[derive(Debug, Clone, Serialize, Deserialize, Display, PartialEq)]
597pub enum Instruction {
598    /// Opens regions.
599    #[serde(deserialize_with = "single_or_multiple_from", alias = "OpenRegion")]
600    OpenRegions(Vec<OpenRegion>),
601    /// Closes regions.
602    #[serde(deserialize_with = "single_or_multiple_from", alias = "CloseRegion")]
603    CloseRegions(Vec<RegionIdent>),
604    /// Upgrades regions.
605    #[serde(deserialize_with = "single_or_multiple_from", alias = "UpgradeRegion")]
606    UpgradeRegions(Vec<UpgradeRegion>),
607    #[serde(
608        deserialize_with = "single_or_multiple_from",
609        alias = "DowngradeRegion"
610    )]
611    /// Downgrades regions.
612    DowngradeRegions(Vec<DowngradeRegion>),
613    /// Invalidates batch cache.
614    InvalidateCaches(Vec<CacheIdent>),
615    /// Flushes regions.
616    FlushRegions(FlushRegions),
617    /// Gets file references for regions.
618    GetFileRefs(GetFileRefs),
619    /// Triggers garbage collection for a region.
620    GcRegions(GcRegions),
621    /// Temporary suspend serving reads or writes
622    Suspend,
623    /// Makes regions enter staging state.
624    EnterStagingRegions(Vec<EnterStagingRegion>),
625    /// Syncs regions.
626    SyncRegions(Vec<SyncRegion>),
627    /// Remaps manifests for a region.
628    RemapManifest(RemapManifest),
629
630    /// Applies staging manifests for a region.
631    ApplyStagingManifests(Vec<ApplyStagingManifest>),
632}
633
634impl Instruction {
635    /// Converts the instruction into a vector of [OpenRegion].
636    pub fn into_open_regions(self) -> Option<Vec<OpenRegion>> {
637        match self {
638            Self::OpenRegions(open_regions) => Some(open_regions),
639            _ => None,
640        }
641    }
642
643    /// Converts the instruction into a vector of [RegionIdent].
644    pub fn into_close_regions(self) -> Option<Vec<RegionIdent>> {
645        match self {
646            Self::CloseRegions(close_regions) => Some(close_regions),
647            _ => None,
648        }
649    }
650
651    /// Converts the instruction into a [FlushRegions].
652    pub fn into_flush_regions(self) -> Option<FlushRegions> {
653        match self {
654            Self::FlushRegions(flush_regions) => Some(flush_regions),
655            _ => None,
656        }
657    }
658
659    /// Converts the instruction into a [DowngradeRegion].
660    pub fn into_downgrade_regions(self) -> Option<Vec<DowngradeRegion>> {
661        match self {
662            Self::DowngradeRegions(downgrade_region) => Some(downgrade_region),
663            _ => None,
664        }
665    }
666
667    /// Converts the instruction into a [UpgradeRegion].
668    pub fn into_upgrade_regions(self) -> Option<Vec<UpgradeRegion>> {
669        match self {
670            Self::UpgradeRegions(upgrade_region) => Some(upgrade_region),
671            _ => None,
672        }
673    }
674
675    pub fn into_get_file_refs(self) -> Option<GetFileRefs> {
676        match self {
677            Self::GetFileRefs(get_file_refs) => Some(get_file_refs),
678            _ => None,
679        }
680    }
681
682    pub fn into_gc_regions(self) -> Option<GcRegions> {
683        match self {
684            Self::GcRegions(gc_regions) => Some(gc_regions),
685            _ => None,
686        }
687    }
688
689    pub fn into_enter_staging_regions(self) -> Option<Vec<EnterStagingRegion>> {
690        match self {
691            Self::EnterStagingRegions(enter_staging) => Some(enter_staging),
692            _ => None,
693        }
694    }
695
696    pub fn into_sync_regions(self) -> Option<Vec<SyncRegion>> {
697        match self {
698            Self::SyncRegions(sync_regions) => Some(sync_regions),
699            _ => None,
700        }
701    }
702}
703
704/// The reply of [UpgradeRegion].
705#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
706pub struct UpgradeRegionReply {
707    /// The [RegionId].
708    /// For compatibility, it is defaulted to [RegionId::new(0, 0)].
709    #[serde(default)]
710    pub region_id: RegionId,
711    /// Returns true if `last_entry_id` has been replayed to the latest.
712    pub ready: bool,
713    /// Indicates whether the region exists.
714    pub exists: bool,
715    /// Returns error if any.
716    pub error: Option<String>,
717}
718
719impl Display for UpgradeRegionReply {
720    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
721        write!(
722            f,
723            "(ready={}, exists={}, error={:?})",
724            self.ready, self.exists, self.error
725        )
726    }
727}
728
729#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
730pub struct DowngradeRegionsReply {
731    pub replies: Vec<DowngradeRegionReply>,
732}
733
734impl DowngradeRegionsReply {
735    pub fn new(replies: Vec<DowngradeRegionReply>) -> Self {
736        Self { replies }
737    }
738
739    pub fn single(reply: DowngradeRegionReply) -> Self {
740        Self::new(vec![reply])
741    }
742}
743
744#[derive(Deserialize)]
745#[serde(untagged)]
746enum DowngradeRegionsCompat {
747    Single(DowngradeRegionReply),
748    Multiple(DowngradeRegionsReply),
749}
750
751fn downgrade_regions_compat_from<'de, D>(deserializer: D) -> Result<DowngradeRegionsReply, D::Error>
752where
753    D: Deserializer<'de>,
754{
755    let helper = DowngradeRegionsCompat::deserialize(deserializer)?;
756    Ok(match helper {
757        DowngradeRegionsCompat::Single(x) => DowngradeRegionsReply::new(vec![x]),
758        DowngradeRegionsCompat::Multiple(reply) => reply,
759    })
760}
761
762#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
763pub struct UpgradeRegionsReply {
764    pub replies: Vec<UpgradeRegionReply>,
765}
766
767impl UpgradeRegionsReply {
768    pub fn new(replies: Vec<UpgradeRegionReply>) -> Self {
769        Self { replies }
770    }
771
772    pub fn single(reply: UpgradeRegionReply) -> Self {
773        Self::new(vec![reply])
774    }
775}
776
777#[derive(Deserialize)]
778#[serde(untagged)]
779enum UpgradeRegionsCompat {
780    Single(UpgradeRegionReply),
781    Multiple(UpgradeRegionsReply),
782}
783
784fn upgrade_regions_compat_from<'de, D>(deserializer: D) -> Result<UpgradeRegionsReply, D::Error>
785where
786    D: Deserializer<'de>,
787{
788    let helper = UpgradeRegionsCompat::deserialize(deserializer)?;
789    Ok(match helper {
790        UpgradeRegionsCompat::Single(x) => UpgradeRegionsReply::new(vec![x]),
791        UpgradeRegionsCompat::Multiple(reply) => reply,
792    })
793}
794
795#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
796pub struct EnterStagingRegionReply {
797    pub region_id: RegionId,
798    /// Returns true if the region is under the new region rule.
799    pub ready: bool,
800    /// Indicates whether the region exists.
801    pub exists: bool,
802    /// Return error if any during the operation.
803    pub error: Option<String>,
804}
805
806#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
807pub struct EnterStagingRegionsReply {
808    pub replies: Vec<EnterStagingRegionReply>,
809}
810
811impl EnterStagingRegionsReply {
812    pub fn new(replies: Vec<EnterStagingRegionReply>) -> Self {
813        Self { replies }
814    }
815}
816
817/// Reply for a single region sync request.
818#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
819pub struct SyncRegionReply {
820    /// Region id of the synced region.
821    pub region_id: RegionId,
822    /// Returns true if the region is successfully synced and ready.
823    pub ready: bool,
824    /// Indicates whether the region exists.
825    pub exists: bool,
826    /// Return error message if any during the operation.
827    pub error: Option<String>,
828}
829
830/// Reply for a batch of region sync requests.
831#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
832pub struct SyncRegionsReply {
833    pub replies: Vec<SyncRegionReply>,
834}
835
836impl SyncRegionsReply {
837    pub fn new(replies: Vec<SyncRegionReply>) -> Self {
838        Self { replies }
839    }
840}
841
842#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
843pub struct RemapManifestReply {
844    /// Returns false if the region does not exist.
845    pub exists: bool,
846    /// A map from region IDs to their corresponding remapped manifest paths.
847    pub manifest_paths: HashMap<RegionId, String>,
848    /// Return error if any during the operation.
849    pub error: Option<String>,
850}
851
852impl Display for RemapManifestReply {
853    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
854        write!(
855            f,
856            "RemapManifestReply(manifest_paths={:?}, error={:?})",
857            self.manifest_paths, self.error
858        )
859    }
860}
861
862#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
863pub struct ApplyStagingManifestsReply {
864    pub replies: Vec<ApplyStagingManifestReply>,
865}
866
867impl ApplyStagingManifestsReply {
868    pub fn new(replies: Vec<ApplyStagingManifestReply>) -> Self {
869        Self { replies }
870    }
871}
872
873#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
874pub struct ApplyStagingManifestReply {
875    pub region_id: RegionId,
876    /// Returns true if the region is ready to serve reads and writes.
877    pub ready: bool,
878    /// Indicates whether the region exists.
879    pub exists: bool,
880    /// Return error if any during the operation.
881    pub error: Option<String>,
882}
883
884#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
885#[serde(tag = "type", rename_all = "snake_case")]
886pub enum InstructionReply {
887    #[serde(alias = "open_region")]
888    OpenRegions(SimpleReply),
889    #[serde(alias = "close_region")]
890    CloseRegions(SimpleReply),
891    #[serde(
892        deserialize_with = "upgrade_regions_compat_from",
893        alias = "upgrade_region"
894    )]
895    UpgradeRegions(UpgradeRegionsReply),
896    #[serde(
897        alias = "downgrade_region",
898        deserialize_with = "downgrade_regions_compat_from"
899    )]
900    DowngradeRegions(DowngradeRegionsReply),
901    FlushRegions(FlushRegionReply),
902    GetFileRefs(GetFileRefsReply),
903    GcRegions(GcRegionsReply),
904    EnterStagingRegions(EnterStagingRegionsReply),
905    SyncRegions(SyncRegionsReply),
906    RemapManifest(RemapManifestReply),
907    ApplyStagingManifests(ApplyStagingManifestsReply),
908}
909
910impl Display for InstructionReply {
911    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
912        match self {
913            Self::OpenRegions(reply) => write!(f, "InstructionReply::OpenRegions({})", reply),
914            Self::CloseRegions(reply) => write!(f, "InstructionReply::CloseRegions({})", reply),
915            Self::UpgradeRegions(reply) => {
916                write!(f, "InstructionReply::UpgradeRegions({:?})", reply.replies)
917            }
918            Self::DowngradeRegions(reply) => {
919                write!(f, "InstructionReply::DowngradeRegions({:?})", reply.replies)
920            }
921            Self::FlushRegions(reply) => write!(f, "InstructionReply::FlushRegions({})", reply),
922            Self::GetFileRefs(reply) => write!(f, "InstructionReply::GetFileRefs({})", reply),
923            Self::GcRegions(reply) => write!(f, "InstructionReply::GcRegion({})", reply),
924            Self::EnterStagingRegions(reply) => {
925                write!(
926                    f,
927                    "InstructionReply::EnterStagingRegions({:?})",
928                    reply.replies
929                )
930            }
931            Self::SyncRegions(reply) => {
932                write!(f, "InstructionReply::SyncRegions({:?})", reply.replies)
933            }
934            Self::RemapManifest(reply) => write!(f, "InstructionReply::RemapManifest({})", reply),
935            Self::ApplyStagingManifests(reply) => write!(
936                f,
937                "InstructionReply::ApplyStagingManifests({:?})",
938                reply.replies
939            ),
940        }
941    }
942}
943
944#[cfg(any(test, feature = "testing"))]
945impl InstructionReply {
946    pub fn expect_close_regions_reply(self) -> SimpleReply {
947        match self {
948            Self::CloseRegions(reply) => reply,
949            _ => panic!("Expected CloseRegions reply"),
950        }
951    }
952
953    pub fn expect_open_regions_reply(self) -> SimpleReply {
954        match self {
955            Self::OpenRegions(reply) => reply,
956            _ => panic!("Expected OpenRegions reply"),
957        }
958    }
959
960    pub fn expect_upgrade_regions_reply(self) -> Vec<UpgradeRegionReply> {
961        match self {
962            Self::UpgradeRegions(reply) => reply.replies,
963            _ => panic!("Expected UpgradeRegion reply"),
964        }
965    }
966
967    pub fn expect_downgrade_regions_reply(self) -> Vec<DowngradeRegionReply> {
968        match self {
969            Self::DowngradeRegions(reply) => reply.replies,
970            _ => panic!("Expected DowngradeRegion reply"),
971        }
972    }
973
974    pub fn expect_flush_regions_reply(self) -> FlushRegionReply {
975        match self {
976            Self::FlushRegions(reply) => reply,
977            _ => panic!("Expected FlushRegions reply"),
978        }
979    }
980
981    pub fn expect_enter_staging_regions_reply(self) -> Vec<EnterStagingRegionReply> {
982        match self {
983            Self::EnterStagingRegions(reply) => reply.replies,
984            _ => panic!("Expected EnterStagingRegion reply"),
985        }
986    }
987
988    pub fn expect_sync_regions_reply(self) -> Vec<SyncRegionReply> {
989        match self {
990            Self::SyncRegions(reply) => reply.replies,
991            _ => panic!("Expected SyncRegion reply"),
992        }
993    }
994
995    pub fn expect_remap_manifest_reply(self) -> RemapManifestReply {
996        match self {
997            Self::RemapManifest(reply) => reply,
998            _ => panic!("Expected RemapManifest reply"),
999        }
1000    }
1001
1002    pub fn expect_apply_staging_manifests_reply(self) -> Vec<ApplyStagingManifestReply> {
1003        match self {
1004            Self::ApplyStagingManifests(reply) => reply.replies,
1005            _ => panic!("Expected ApplyStagingManifest reply"),
1006        }
1007    }
1008}
1009
1010#[cfg(test)]
1011mod tests {
1012    use std::collections::HashSet;
1013
1014    use store_api::storage::{FileId, FileRef};
1015
1016    use super::*;
1017
1018    #[test]
1019    fn test_serialize_instruction() {
1020        let open_region = Instruction::OpenRegions(vec![OpenRegion::new(
1021            RegionIdent {
1022                datanode_id: 2,
1023                table_id: 1024,
1024                region_number: 1,
1025                engine: "mito2".to_string(),
1026            },
1027            "test/foo",
1028            HashMap::new(),
1029            HashMap::new(),
1030            false,
1031        )]);
1032
1033        let serialized = serde_json::to_string(&open_region).unwrap();
1034        assert_eq!(
1035            r#"{"OpenRegions":[{"region_ident":{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"},"region_storage_path":"test/foo","region_options":{},"region_wal_options":{},"skip_wal_replay":false}]}"#,
1036            serialized
1037        );
1038
1039        let close_region = Instruction::CloseRegions(vec![RegionIdent {
1040            datanode_id: 2,
1041            table_id: 1024,
1042            region_number: 1,
1043            engine: "mito2".to_string(),
1044        }]);
1045
1046        let serialized = serde_json::to_string(&close_region).unwrap();
1047        assert_eq!(
1048            r#"{"CloseRegions":[{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"}]}"#,
1049            serialized
1050        );
1051
1052        let upgrade_region = Instruction::UpgradeRegions(vec![UpgradeRegion {
1053            region_id: RegionId::new(1024, 1),
1054            last_entry_id: None,
1055            metadata_last_entry_id: None,
1056            replay_timeout: Duration::from_millis(1000),
1057            location_id: None,
1058            replay_entry_id: None,
1059            metadata_replay_entry_id: None,
1060        }]);
1061
1062        let serialized = serde_json::to_string(&upgrade_region).unwrap();
1063        assert_eq!(
1064            r#"{"UpgradeRegions":[{"region_id":4398046511105,"last_entry_id":null,"metadata_last_entry_id":null,"replay_timeout":"1s","location_id":null}]}"#,
1065            serialized
1066        );
1067    }
1068
1069    #[test]
1070    fn test_serialize_instruction_reply() {
1071        let downgrade_region_reply = InstructionReply::DowngradeRegions(
1072            DowngradeRegionsReply::single(DowngradeRegionReply {
1073                region_id: RegionId::new(1024, 1),
1074                last_entry_id: None,
1075                metadata_last_entry_id: None,
1076                exists: true,
1077                error: None,
1078            }),
1079        );
1080
1081        let serialized = serde_json::to_string(&downgrade_region_reply).unwrap();
1082        assert_eq!(
1083            r#"{"type":"downgrade_regions","replies":[{"region_id":4398046511105,"last_entry_id":null,"metadata_last_entry_id":null,"exists":true,"error":null}]}"#,
1084            serialized
1085        );
1086
1087        let upgrade_region_reply =
1088            InstructionReply::UpgradeRegions(UpgradeRegionsReply::single(UpgradeRegionReply {
1089                region_id: RegionId::new(1024, 1),
1090                ready: true,
1091                exists: true,
1092                error: None,
1093            }));
1094        let serialized = serde_json::to_string(&upgrade_region_reply).unwrap();
1095        assert_eq!(
1096            r#"{"type":"upgrade_regions","replies":[{"region_id":4398046511105,"ready":true,"exists":true,"error":null}]}"#,
1097            serialized
1098        );
1099    }
1100
1101    #[test]
1102    fn test_deserialize_instruction() {
1103        // legacy open region instruction
1104        let open_region_instruction = r#"{"OpenRegion":{"region_ident":{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"},"region_storage_path":"test/foo","region_options":{},"region_wal_options":{},"skip_wal_replay":false}}"#;
1105        let open_region_instruction: Instruction =
1106            serde_json::from_str(open_region_instruction).unwrap();
1107        let open_region = Instruction::OpenRegions(vec![OpenRegion::new(
1108            RegionIdent {
1109                datanode_id: 2,
1110                table_id: 1024,
1111                region_number: 1,
1112                engine: "mito2".to_string(),
1113            },
1114            "test/foo",
1115            HashMap::new(),
1116            HashMap::new(),
1117            false,
1118        )]);
1119        assert_eq!(open_region_instruction, open_region);
1120
1121        // legacy close region instruction
1122        let close_region_instruction = r#"{"CloseRegion":{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"}}"#;
1123        let close_region_instruction: Instruction =
1124            serde_json::from_str(close_region_instruction).unwrap();
1125        let close_region = Instruction::CloseRegions(vec![RegionIdent {
1126            datanode_id: 2,
1127            table_id: 1024,
1128            region_number: 1,
1129            engine: "mito2".to_string(),
1130        }]);
1131        assert_eq!(close_region_instruction, close_region);
1132
1133        // legacy downgrade region instruction
1134        let downgrade_region_instruction = r#"{"DowngradeRegions":{"region_id":4398046511105,"flush_timeout":{"secs":1,"nanos":0}}}"#;
1135        let downgrade_region_instruction: Instruction =
1136            serde_json::from_str(downgrade_region_instruction).unwrap();
1137        let downgrade_region = Instruction::DowngradeRegions(vec![DowngradeRegion {
1138            region_id: RegionId::new(1024, 1),
1139            flush_timeout: Some(Duration::from_millis(1000)),
1140        }]);
1141        assert_eq!(downgrade_region_instruction, downgrade_region);
1142
1143        // legacy upgrade region instruction
1144        let upgrade_region_instruction = r#"{"UpgradeRegion":{"region_id":4398046511105,"last_entry_id":null,"metadata_last_entry_id":null,"replay_timeout":"1s","location_id":null,"replay_entry_id":null,"metadata_replay_entry_id":null}}"#;
1145        let upgrade_region_instruction: Instruction =
1146            serde_json::from_str(upgrade_region_instruction).unwrap();
1147        let upgrade_region = Instruction::UpgradeRegions(vec![UpgradeRegion {
1148            region_id: RegionId::new(1024, 1),
1149            last_entry_id: None,
1150            metadata_last_entry_id: None,
1151            replay_timeout: Duration::from_millis(1000),
1152            location_id: None,
1153            replay_entry_id: None,
1154            metadata_replay_entry_id: None,
1155        }]);
1156        assert_eq!(upgrade_region_instruction, upgrade_region);
1157    }
1158
1159    #[test]
1160    fn test_deserialize_instruction_reply() {
1161        // legacy close region reply
1162        let close_region_instruction_reply =
1163            r#"{"result":true,"error":null,"type":"close_region"}"#;
1164        let close_region_instruction_reply: InstructionReply =
1165            serde_json::from_str(close_region_instruction_reply).unwrap();
1166        let close_region_reply = InstructionReply::CloseRegions(SimpleReply {
1167            result: true,
1168            error: None,
1169        });
1170        assert_eq!(close_region_instruction_reply, close_region_reply);
1171
1172        // legacy open region reply
1173        let open_region_instruction_reply = r#"{"result":true,"error":null,"type":"open_region"}"#;
1174        let open_region_instruction_reply: InstructionReply =
1175            serde_json::from_str(open_region_instruction_reply).unwrap();
1176        let open_region_reply = InstructionReply::OpenRegions(SimpleReply {
1177            result: true,
1178            error: None,
1179        });
1180        assert_eq!(open_region_instruction_reply, open_region_reply);
1181
1182        // legacy downgrade region reply
1183        let downgrade_region_instruction_reply = r#"{"region_id":4398046511105,"last_entry_id":null,"metadata_last_entry_id":null,"exists":true,"error":null,"type":"downgrade_region"}"#;
1184        let downgrade_region_instruction_reply: InstructionReply =
1185            serde_json::from_str(downgrade_region_instruction_reply).unwrap();
1186        let downgrade_region_reply = InstructionReply::DowngradeRegions(
1187            DowngradeRegionsReply::single(DowngradeRegionReply {
1188                region_id: RegionId::new(1024, 1),
1189                last_entry_id: None,
1190                metadata_last_entry_id: None,
1191                exists: true,
1192                error: None,
1193            }),
1194        );
1195        assert_eq!(downgrade_region_instruction_reply, downgrade_region_reply);
1196
1197        // legacy upgrade region reply
1198        let upgrade_region_instruction_reply = r#"{"region_id":4398046511105,"ready":true,"exists":true,"error":null,"type":"upgrade_region"}"#;
1199        let upgrade_region_instruction_reply: InstructionReply =
1200            serde_json::from_str(upgrade_region_instruction_reply).unwrap();
1201        let upgrade_region_reply =
1202            InstructionReply::UpgradeRegions(UpgradeRegionsReply::single(UpgradeRegionReply {
1203                region_id: RegionId::new(1024, 1),
1204                ready: true,
1205                exists: true,
1206                error: None,
1207            }));
1208        assert_eq!(upgrade_region_instruction_reply, upgrade_region_reply);
1209    }
1210
1211    #[derive(Debug, Clone, Serialize, Deserialize)]
1212    struct LegacyOpenRegion {
1213        region_ident: RegionIdent,
1214        region_storage_path: String,
1215        region_options: HashMap<String, String>,
1216    }
1217
1218    #[test]
1219    fn test_compatible_serialize_open_region() {
1220        let region_ident = RegionIdent {
1221            datanode_id: 2,
1222            table_id: 1024,
1223            region_number: 1,
1224            engine: "mito2".to_string(),
1225        };
1226        let region_storage_path = "test/foo".to_string();
1227        let region_options = HashMap::from([
1228            ("a".to_string(), "aa".to_string()),
1229            ("b".to_string(), "bb".to_string()),
1230        ]);
1231
1232        // Serialize a legacy OpenRegion.
1233        let legacy_open_region = LegacyOpenRegion {
1234            region_ident: region_ident.clone(),
1235            region_storage_path: region_storage_path.clone(),
1236            region_options: region_options.clone(),
1237        };
1238        let serialized = serde_json::to_string(&legacy_open_region).unwrap();
1239
1240        // Deserialize to OpenRegion.
1241        let deserialized = serde_json::from_str(&serialized).unwrap();
1242        let expected = OpenRegion {
1243            region_ident,
1244            region_storage_path,
1245            region_options,
1246            region_wal_options: HashMap::new(),
1247            skip_wal_replay: false,
1248        };
1249        assert_eq!(expected, deserialized);
1250    }
1251
1252    #[test]
1253    fn test_flush_regions_creation() {
1254        let region_id = RegionId::new(1024, 1);
1255
1256        // Single region sync flush
1257        let single_sync = FlushRegions::sync_single(region_id);
1258        assert_eq!(single_sync.region_ids, vec![region_id]);
1259        assert_eq!(single_sync.strategy, FlushStrategy::Sync);
1260        assert!(!single_sync.is_hint());
1261        assert!(single_sync.is_sync());
1262        assert_eq!(single_sync.error_strategy, FlushErrorStrategy::FailFast);
1263        assert!(single_sync.is_single_region());
1264        assert_eq!(single_sync.single_region_id(), Some(region_id));
1265
1266        // Batch async flush (hint)
1267        let region_ids = vec![RegionId::new(1024, 1), RegionId::new(1024, 2)];
1268        let batch_async = FlushRegions::async_batch(region_ids.clone());
1269        assert_eq!(batch_async.region_ids, region_ids);
1270        assert_eq!(batch_async.strategy, FlushStrategy::Async);
1271        assert!(batch_async.is_hint());
1272        assert!(!batch_async.is_sync());
1273        assert_eq!(batch_async.error_strategy, FlushErrorStrategy::TryAll);
1274        assert!(!batch_async.is_single_region());
1275        assert_eq!(batch_async.single_region_id(), None);
1276
1277        // Batch sync flush
1278        let batch_sync = FlushRegions::sync_batch(region_ids.clone(), FlushErrorStrategy::FailFast);
1279        assert_eq!(batch_sync.region_ids, region_ids);
1280        assert_eq!(batch_sync.strategy, FlushStrategy::Sync);
1281        assert!(!batch_sync.is_hint());
1282        assert!(batch_sync.is_sync());
1283        assert_eq!(batch_sync.error_strategy, FlushErrorStrategy::FailFast);
1284    }
1285
1286    #[test]
1287    fn test_flush_regions_conversion() {
1288        let region_id = RegionId::new(1024, 1);
1289
1290        let from_region_id: FlushRegions = region_id.into();
1291        assert_eq!(from_region_id.region_ids, vec![region_id]);
1292        assert_eq!(from_region_id.strategy, FlushStrategy::Sync);
1293        assert!(!from_region_id.is_hint());
1294        assert!(from_region_id.is_sync());
1295
1296        // Test default construction
1297        let flush_regions = FlushRegions {
1298            region_ids: vec![region_id],
1299            strategy: FlushStrategy::Async,
1300            error_strategy: FlushErrorStrategy::TryAll,
1301        };
1302        assert_eq!(flush_regions.region_ids, vec![region_id]);
1303        assert_eq!(flush_regions.strategy, FlushStrategy::Async);
1304        assert!(flush_regions.is_hint());
1305        assert!(!flush_regions.is_sync());
1306    }
1307
1308    #[test]
1309    fn test_flush_region_reply() {
1310        let region_id = RegionId::new(1024, 1);
1311
1312        // Successful single region reply
1313        let success_reply = FlushRegionReply::success_single(region_id);
1314        assert!(success_reply.overall_success);
1315        assert_eq!(success_reply.results.len(), 1);
1316        assert_eq!(success_reply.results[0].0, region_id);
1317        assert!(success_reply.results[0].1.is_ok());
1318
1319        // Failed single region reply
1320        let error_reply = FlushRegionReply::error_single(region_id, "test error".to_string());
1321        assert!(!error_reply.overall_success);
1322        assert_eq!(error_reply.results.len(), 1);
1323        assert_eq!(error_reply.results[0].0, region_id);
1324        assert!(error_reply.results[0].1.is_err());
1325
1326        // Batch reply
1327        let region_id2 = RegionId::new(1024, 2);
1328        let results = vec![
1329            (region_id, Ok(())),
1330            (region_id2, Err("flush failed".to_string())),
1331        ];
1332        let batch_reply = FlushRegionReply::from_results(results);
1333        assert!(!batch_reply.overall_success);
1334        assert_eq!(batch_reply.results.len(), 2);
1335
1336        // Conversion to SimpleReply
1337        let simple_reply = batch_reply.to_simple_reply();
1338        assert!(!simple_reply.result);
1339        assert!(simple_reply.error.is_some());
1340        assert!(simple_reply.error.unwrap().contains("flush failed"));
1341    }
1342
1343    #[test]
1344    fn test_serialize_flush_regions_instruction() {
1345        let region_id = RegionId::new(1024, 1);
1346        let flush_regions = FlushRegions::sync_single(region_id);
1347        let instruction = Instruction::FlushRegions(flush_regions.clone());
1348
1349        let serialized = serde_json::to_string(&instruction).unwrap();
1350        let deserialized: Instruction = serde_json::from_str(&serialized).unwrap();
1351
1352        match deserialized {
1353            Instruction::FlushRegions(fr) => {
1354                assert_eq!(fr.region_ids, vec![region_id]);
1355                assert_eq!(fr.strategy, FlushStrategy::Sync);
1356                assert_eq!(fr.error_strategy, FlushErrorStrategy::FailFast);
1357            }
1358            _ => panic!("Expected FlushRegions instruction"),
1359        }
1360    }
1361
1362    #[test]
1363    fn test_serialize_flush_regions_batch_instruction() {
1364        let region_ids = vec![RegionId::new(1024, 1), RegionId::new(1024, 2)];
1365        let flush_regions =
1366            FlushRegions::sync_batch(region_ids.clone(), FlushErrorStrategy::TryAll);
1367        let instruction = Instruction::FlushRegions(flush_regions);
1368
1369        let serialized = serde_json::to_string(&instruction).unwrap();
1370        let deserialized: Instruction = serde_json::from_str(&serialized).unwrap();
1371
1372        match deserialized {
1373            Instruction::FlushRegions(fr) => {
1374                assert_eq!(fr.region_ids, region_ids);
1375                assert_eq!(fr.strategy, FlushStrategy::Sync);
1376                assert!(!fr.is_hint());
1377                assert!(fr.is_sync());
1378                assert_eq!(fr.error_strategy, FlushErrorStrategy::TryAll);
1379            }
1380            _ => panic!("Expected FlushRegions instruction"),
1381        }
1382    }
1383
1384    #[test]
1385    fn test_serialize_get_file_refs_instruction_reply() {
1386        let mut manifest = FileRefsManifest::default();
1387        let r0 = RegionId::new(1024, 1);
1388        let r1 = RegionId::new(1024, 2);
1389        manifest.file_refs.insert(
1390            r0,
1391            HashSet::from([FileRef::new(r0, FileId::random(), None)]),
1392        );
1393        manifest.file_refs.insert(
1394            r1,
1395            HashSet::from([FileRef::new(r1, FileId::random(), None)]),
1396        );
1397        manifest.manifest_version.insert(r0, 10);
1398        manifest.manifest_version.insert(r1, 20);
1399
1400        let instruction_reply = InstructionReply::GetFileRefs(GetFileRefsReply {
1401            file_refs_manifest: manifest,
1402            success: true,
1403            error: None,
1404        });
1405
1406        let serialized = serde_json::to_string(&instruction_reply).unwrap();
1407        let deserialized = serde_json::from_str(&serialized).unwrap();
1408
1409        assert_eq!(instruction_reply, deserialized);
1410    }
1411}