common_meta/
instruction.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::fmt::{Display, Formatter};
17use std::time::Duration;
18
19use serde::{Deserialize, Deserializer, Serialize, Serializer};
20use store_api::region_engine::SyncRegionFromRequest;
21use store_api::storage::{FileRefsManifest, GcReport, RegionId, RegionNumber};
22use strum::Display;
23use table::metadata::TableId;
24use table::table_name::TableName;
25
26use crate::flow_name::FlowName;
27use crate::key::schema_name::SchemaName;
28use crate::key::{FlowId, FlowPartitionId};
29use crate::peer::Peer;
30use crate::{DatanodeId, FlownodeId};
31
32#[derive(Eq, Hash, PartialEq, Clone, Debug, Serialize, Deserialize)]
33pub struct RegionIdent {
34    pub datanode_id: DatanodeId,
35    pub table_id: TableId,
36    pub region_number: RegionNumber,
37    pub engine: String,
38}
39
40impl RegionIdent {
41    pub fn get_region_id(&self) -> RegionId {
42        RegionId::new(self.table_id, self.region_number)
43    }
44}
45
46impl Display for RegionIdent {
47    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
48        write!(
49            f,
50            "RegionIdent(datanode_id='{}', table_id={}, region_number={}, engine = {})",
51            self.datanode_id, self.table_id, self.region_number, self.engine
52        )
53    }
54}
55
56/// The result of downgrade leader region.
57#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
58pub struct DowngradeRegionReply {
59    /// The [RegionId].
60    /// For compatibility, it is defaulted to [RegionId::new(0, 0)].
61    #[serde(default)]
62    pub region_id: RegionId,
63    /// Returns the `last_entry_id` if available.
64    pub last_entry_id: Option<u64>,
65    /// Returns the `metadata_last_entry_id` if available (Only available for metric engine).
66    pub metadata_last_entry_id: Option<u64>,
67    /// Indicates whether the region exists.
68    pub exists: bool,
69    /// Return error if any during the operation.
70    pub error: Option<String>,
71}
72
73impl Display for DowngradeRegionReply {
74    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
75        write!(
76            f,
77            "(last_entry_id={:?}, exists={}, error={:?})",
78            self.last_entry_id, self.exists, self.error
79        )
80    }
81}
82
83#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
84pub struct SimpleReply {
85    pub result: bool,
86    pub error: Option<String>,
87}
88
89/// Reply for flush region operations with support for batch results.
90#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
91pub struct FlushRegionReply {
92    /// Results for each region that was attempted to be flushed.
93    /// For single region flushes, this will contain one result.
94    /// For batch flushes, this contains results for all attempted regions.
95    pub results: Vec<(RegionId, Result<(), String>)>,
96    /// Overall success: true if all regions were flushed successfully.
97    pub overall_success: bool,
98}
99
100impl FlushRegionReply {
101    /// Create a successful single region reply.
102    pub fn success_single(region_id: RegionId) -> Self {
103        Self {
104            results: vec![(region_id, Ok(()))],
105            overall_success: true,
106        }
107    }
108
109    /// Create a failed single region reply.
110    pub fn error_single(region_id: RegionId, error: String) -> Self {
111        Self {
112            results: vec![(region_id, Err(error))],
113            overall_success: false,
114        }
115    }
116
117    /// Create a batch reply from individual results.
118    pub fn from_results(results: Vec<(RegionId, Result<(), String>)>) -> Self {
119        let overall_success = results.iter().all(|(_, result)| result.is_ok());
120        Self {
121            results,
122            overall_success,
123        }
124    }
125
126    /// Convert to SimpleReply for backward compatibility.
127    pub fn to_simple_reply(&self) -> SimpleReply {
128        if self.overall_success {
129            SimpleReply {
130                result: true,
131                error: None,
132            }
133        } else {
134            let errors: Vec<String> = self
135                .results
136                .iter()
137                .filter_map(|(region_id, result)| {
138                    result
139                        .as_ref()
140                        .err()
141                        .map(|err| format!("{}: {}", region_id, err))
142                })
143                .collect();
144            SimpleReply {
145                result: false,
146                error: Some(errors.join("; ")),
147            }
148        }
149    }
150}
151
152impl Display for SimpleReply {
153    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
154        write!(f, "(result={}, error={:?})", self.result, self.error)
155    }
156}
157
158impl Display for FlushRegionReply {
159    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
160        let results_str = self
161            .results
162            .iter()
163            .map(|(region_id, result)| match result {
164                Ok(()) => format!("{}:OK", region_id),
165                Err(err) => format!("{}:ERR({})", region_id, err),
166            })
167            .collect::<Vec<_>>()
168            .join(", ");
169        write!(
170            f,
171            "(overall_success={}, results=[{}])",
172            self.overall_success, results_str
173        )
174    }
175}
176
177impl Display for OpenRegion {
178    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
179        write!(
180            f,
181            "OpenRegion(region_ident={}, region_storage_path={})",
182            self.region_ident, self.region_storage_path
183        )
184    }
185}
186
187#[serde_with::serde_as]
188#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
189pub struct OpenRegion {
190    pub region_ident: RegionIdent,
191    pub region_storage_path: String,
192    pub region_options: HashMap<String, String>,
193    #[serde(default)]
194    #[serde_as(as = "HashMap<serde_with::DisplayFromStr, _>")]
195    pub region_wal_options: HashMap<RegionNumber, String>,
196    #[serde(default)]
197    pub skip_wal_replay: bool,
198}
199
200impl OpenRegion {
201    pub fn new(
202        region_ident: RegionIdent,
203        path: &str,
204        region_options: HashMap<String, String>,
205        region_wal_options: HashMap<RegionNumber, String>,
206        skip_wal_replay: bool,
207    ) -> Self {
208        Self {
209            region_ident,
210            region_storage_path: path.to_string(),
211            region_options,
212            region_wal_options,
213            skip_wal_replay,
214        }
215    }
216}
217
218/// The instruction of downgrading leader region.
219#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
220pub struct DowngradeRegion {
221    /// The [RegionId].
222    pub region_id: RegionId,
223    /// The timeout of waiting for flush the region.
224    ///
225    /// `None` stands for don't flush before downgrading the region.
226    #[serde(default)]
227    pub flush_timeout: Option<Duration>,
228}
229
230impl Display for DowngradeRegion {
231    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
232        write!(
233            f,
234            "DowngradeRegion(region_id={}, flush_timeout={:?})",
235            self.region_id, self.flush_timeout,
236        )
237    }
238}
239
240/// Upgrades a follower region to leader region.
241#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
242pub struct UpgradeRegion {
243    /// The [RegionId].
244    pub region_id: RegionId,
245    /// The `last_entry_id` of old leader region.
246    pub last_entry_id: Option<u64>,
247    /// The `last_entry_id` of old leader metadata region (Only used for metric engine).
248    pub metadata_last_entry_id: Option<u64>,
249    /// The timeout of waiting for a wal replay.
250    ///
251    /// `None` stands for no wait,
252    /// it's helpful to verify whether the leader region is ready.
253    #[serde(with = "humantime_serde")]
254    pub replay_timeout: Duration,
255    /// The hint for replaying memtable.
256    #[serde(default)]
257    pub location_id: Option<u64>,
258    #[serde(default, skip_serializing_if = "Option::is_none")]
259    pub replay_entry_id: Option<u64>,
260    #[serde(default, skip_serializing_if = "Option::is_none")]
261    pub metadata_replay_entry_id: Option<u64>,
262}
263
264impl UpgradeRegion {
265    /// Sets the replay entry id.
266    pub fn with_replay_entry_id(mut self, replay_entry_id: Option<u64>) -> Self {
267        self.replay_entry_id = replay_entry_id;
268        self
269    }
270
271    /// Sets the metadata replay entry id.
272    pub fn with_metadata_replay_entry_id(mut self, metadata_replay_entry_id: Option<u64>) -> Self {
273        self.metadata_replay_entry_id = metadata_replay_entry_id;
274        self
275    }
276}
277
278#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
279/// The identifier of cache.
280pub enum CacheIdent {
281    FlowId(FlowId),
282    /// Indicate change of address of flownode.
283    FlowNodeAddressChange(u64),
284    FlowName(FlowName),
285    TableId(TableId),
286    TableName(TableName),
287    SchemaName(SchemaName),
288    CreateFlow(CreateFlow),
289    DropFlow(DropFlow),
290}
291
292#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
293pub struct CreateFlow {
294    /// The unique identifier for the flow.
295    pub flow_id: FlowId,
296    pub source_table_ids: Vec<TableId>,
297    /// Mapping of flow partition to peer information
298    pub partition_to_peer_mapping: Vec<(FlowPartitionId, Peer)>,
299}
300
301#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
302pub struct DropFlow {
303    pub flow_id: FlowId,
304    pub source_table_ids: Vec<TableId>,
305    /// Mapping of flow partition to flownode id
306    pub flow_part2node_id: Vec<(FlowPartitionId, FlownodeId)>,
307}
308
309/// Strategy for executing flush operations.
310#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
311pub enum FlushStrategy {
312    /// Synchronous operation that waits for completion and expects a reply
313    #[default]
314    Sync,
315    /// Asynchronous hint operation (fire-and-forget, no reply expected)
316    Async,
317}
318
319/// Error handling strategy for batch flush operations.
320#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
321pub enum FlushErrorStrategy {
322    /// Abort on first error (fail-fast)
323    #[default]
324    FailFast,
325    /// Attempt to flush all regions and collect all errors
326    TryAll,
327}
328
329/// Unified flush instruction supporting both single and batch operations
330/// with configurable execution strategies and error handling.
331#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
332pub struct FlushRegions {
333    /// List of region IDs to flush. Can contain a single region or multiple regions.
334    pub region_ids: Vec<RegionId>,
335    /// Execution strategy: Sync (expects reply) or Async (fire-and-forget hint).
336    #[serde(default)]
337    pub strategy: FlushStrategy,
338    /// Error handling strategy for batch operations (only applies when multiple regions and sync strategy).
339    #[serde(default)]
340    pub error_strategy: FlushErrorStrategy,
341}
342
343impl Display for FlushRegions {
344    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
345        write!(
346            f,
347            "FlushRegions(region_ids={:?}, strategy={:?}, error_strategy={:?})",
348            self.region_ids, self.strategy, self.error_strategy
349        )
350    }
351}
352
353impl FlushRegions {
354    /// Create synchronous single-region flush
355    pub fn sync_single(region_id: RegionId) -> Self {
356        Self {
357            region_ids: vec![region_id],
358            strategy: FlushStrategy::Sync,
359            error_strategy: FlushErrorStrategy::FailFast,
360        }
361    }
362
363    /// Create asynchronous batch flush (fire-and-forget)
364    pub fn async_batch(region_ids: Vec<RegionId>) -> Self {
365        Self {
366            region_ids,
367            strategy: FlushStrategy::Async,
368            error_strategy: FlushErrorStrategy::TryAll,
369        }
370    }
371
372    /// Create synchronous batch flush with error strategy
373    pub fn sync_batch(region_ids: Vec<RegionId>, error_strategy: FlushErrorStrategy) -> Self {
374        Self {
375            region_ids,
376            strategy: FlushStrategy::Sync,
377            error_strategy,
378        }
379    }
380
381    /// Check if this is a single region flush.
382    pub fn is_single_region(&self) -> bool {
383        self.region_ids.len() == 1
384    }
385
386    /// Get the single region ID if this is a single region flush.
387    pub fn single_region_id(&self) -> Option<RegionId> {
388        if self.is_single_region() {
389            self.region_ids.first().copied()
390        } else {
391            None
392        }
393    }
394
395    /// Check if this is a hint (asynchronous) operation.
396    pub fn is_hint(&self) -> bool {
397        matches!(self.strategy, FlushStrategy::Async)
398    }
399
400    /// Check if this is a synchronous operation.
401    pub fn is_sync(&self) -> bool {
402        matches!(self.strategy, FlushStrategy::Sync)
403    }
404}
405
406impl From<RegionId> for FlushRegions {
407    fn from(region_id: RegionId) -> Self {
408        Self::sync_single(region_id)
409    }
410}
411
412#[derive(Debug, Deserialize)]
413#[serde(untagged)]
414enum SingleOrMultiple<T> {
415    Single(T),
416    Multiple(Vec<T>),
417}
418
419fn single_or_multiple_from<'de, D, T>(deserializer: D) -> Result<Vec<T>, D::Error>
420where
421    D: Deserializer<'de>,
422    T: Deserialize<'de>,
423{
424    let helper = SingleOrMultiple::<T>::deserialize(deserializer)?;
425    Ok(match helper {
426        SingleOrMultiple::Single(x) => vec![x],
427        SingleOrMultiple::Multiple(xs) => xs,
428    })
429}
430
431/// Instruction to get file references for specified regions.
432#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
433pub struct GetFileRefs {
434    /// List of region IDs to get file references from active FileHandles (in-memory).
435    pub query_regions: Vec<RegionId>,
436    /// Mapping from the src region IDs (whose file references to look for) to
437    /// the dst region IDs (where to read the manifests).
438    /// Key: The source region IDs (where files originally came from).
439    /// Value: The set of destination region IDs (whose manifests need to be read).
440    pub related_regions: HashMap<RegionId, HashSet<RegionId>>,
441}
442
443impl Display for GetFileRefs {
444    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
445        write!(f, "GetFileRefs(region_ids={:?})", self.query_regions)
446    }
447}
448
449/// Instruction to trigger garbage collection for a region.
450#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
451pub struct GcRegions {
452    /// The region ID to perform GC on, only regions that are currently on the given datanode can be garbage collected, regions not on the datanode will report errors.
453    pub regions: Vec<RegionId>,
454    /// The file references manifest containing temporary file references.
455    pub file_refs_manifest: FileRefsManifest,
456    /// Whether to perform a full file listing to find orphan files.
457    pub full_file_listing: bool,
458}
459
460impl Display for GcRegions {
461    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
462        write!(
463            f,
464            "GcRegion(regions={:?}, file_refs_count={}, full_file_listing={})",
465            self.regions,
466            self.file_refs_manifest.file_refs.len(),
467            self.full_file_listing
468        )
469    }
470}
471
472/// Reply for GetFileRefs instruction.
473#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
474pub struct GetFileRefsReply {
475    /// The file references manifest.
476    pub file_refs_manifest: FileRefsManifest,
477    /// Whether the operation was successful.
478    pub success: bool,
479    /// Error message if any.
480    pub error: Option<String>,
481}
482
483impl Display for GetFileRefsReply {
484    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
485        write!(
486            f,
487            "GetFileRefsReply(success={}, file_refs_count={}, error={:?})",
488            self.success,
489            self.file_refs_manifest.file_refs.len(),
490            self.error
491        )
492    }
493}
494
495/// Reply for GC instruction.
496#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
497pub struct GcRegionsReply {
498    pub result: Result<GcReport, String>,
499}
500
501impl Display for GcRegionsReply {
502    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
503        write!(
504            f,
505            "GcReply(result={})",
506            match &self.result {
507                Ok(report) => format!(
508                    "GcReport(deleted_files_count={}, need_retry_regions_count={})",
509                    report.deleted_files.len(),
510                    report.need_retry_regions.len()
511                ),
512                Err(err) => format!("Err({})", err),
513            }
514        )
515    }
516}
517
518#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
519pub struct EnterStagingRegion {
520    pub region_id: RegionId,
521    #[serde(
522        alias = "partition_expr",
523        deserialize_with = "deserialize_enter_staging_partition_directive",
524        serialize_with = "serialize_enter_staging_partition_directive"
525    )]
526    pub partition_directive: StagingPartitionDirective,
527}
528
529impl Display for EnterStagingRegion {
530    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
531        write!(
532            f,
533            "EnterStagingRegion(region_id={}, partition_directive={})",
534            self.region_id, self.partition_directive
535        )
536    }
537}
538
539#[derive(Debug, Clone, PartialEq, Eq)]
540pub enum StagingPartitionDirective {
541    UpdatePartitionExpr(String),
542    RejectAllWrites,
543}
544
545impl StagingPartitionDirective {
546    /// Returns the partition expression carried by this directive, if any.
547    pub fn as_partition_expr(&self) -> Option<&str> {
548        match self {
549            Self::UpdatePartitionExpr(expr) => Some(expr),
550            Self::RejectAllWrites => None,
551        }
552    }
553}
554
555impl Display for StagingPartitionDirective {
556    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
557        match self {
558            Self::UpdatePartitionExpr(expr) => write!(f, "UpdatePartitionExpr({})", expr),
559            Self::RejectAllWrites => write!(f, "RejectAllWrites"),
560        }
561    }
562}
563
564fn serialize_enter_staging_partition_directive<S>(
565    rule: &StagingPartitionDirective,
566    serializer: S,
567) -> std::result::Result<S::Ok, S::Error>
568where
569    S: Serializer,
570{
571    match rule {
572        StagingPartitionDirective::UpdatePartitionExpr(expr) => serializer.serialize_str(expr),
573        StagingPartitionDirective::RejectAllWrites => {
574            #[derive(Serialize)]
575            struct RejectAllWritesSer<'a> {
576                r#type: &'a str,
577            }
578
579            RejectAllWritesSer {
580                r#type: "reject_all_writes",
581            }
582            .serialize(serializer)
583        }
584    }
585}
586
587fn deserialize_enter_staging_partition_directive<'de, D>(
588    deserializer: D,
589) -> std::result::Result<StagingPartitionDirective, D::Error>
590where
591    D: Deserializer<'de>,
592{
593    #[derive(Deserialize)]
594    #[serde(untagged)]
595    enum Compat {
596        Legacy(String),
597        TypeTagged { r#type: String },
598    }
599
600    match Compat::deserialize(deserializer)? {
601        Compat::Legacy(expr) => Ok(StagingPartitionDirective::UpdatePartitionExpr(expr)),
602        Compat::TypeTagged { r#type } if r#type == "reject_all_writes" => {
603            Ok(StagingPartitionDirective::RejectAllWrites)
604        }
605        Compat::TypeTagged { r#type } => Err(serde::de::Error::custom(format!(
606            "Unknown enter staging partition directive type: {}",
607            r#type
608        ))),
609    }
610}
611
612/// Instruction payload for syncing a region from a manifest or another region.
613#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
614pub struct SyncRegion {
615    /// Region id to sync.
616    pub region_id: RegionId,
617    /// Request to sync the region.
618    pub request: SyncRegionFromRequest,
619}
620
621impl Display for SyncRegion {
622    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
623        write!(
624            f,
625            "SyncRegion(region_id={}, request={:?})",
626            self.region_id, self.request
627        )
628    }
629}
630
631#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
632pub struct RemapManifest {
633    pub region_id: RegionId,
634    /// Regions to remap manifests from.
635    pub input_regions: Vec<RegionId>,
636    /// For each old region, which new regions should receive its files
637    pub region_mapping: HashMap<RegionId, Vec<RegionId>>,
638    /// New partition expressions for the new regions.
639    pub new_partition_exprs: HashMap<RegionId, String>,
640}
641
642impl Display for RemapManifest {
643    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
644        write!(
645            f,
646            "RemapManifest(region_id={}, input_regions={:?}, region_mapping={:?}, new_partition_exprs={:?})",
647            self.region_id, self.input_regions, self.region_mapping, self.new_partition_exprs
648        )
649    }
650}
651
652#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
653pub struct ApplyStagingManifest {
654    /// The region ID to apply the staging manifest to.
655    pub region_id: RegionId,
656    /// The partition expression of the staging region.
657    pub partition_expr: String,
658    /// The region that stores the staging manifests in its staging blob storage.
659    pub central_region_id: RegionId,
660    /// The relative path to the staging manifest within the central region's staging blob storage.
661    pub manifest_path: String,
662}
663
664impl Display for ApplyStagingManifest {
665    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
666        write!(
667            f,
668            "ApplyStagingManifest(region_id={}, partition_expr={}, central_region_id={}, manifest_path={})",
669            self.region_id, self.partition_expr, self.central_region_id, self.manifest_path
670        )
671    }
672}
673
674#[derive(Debug, Clone, Serialize, Deserialize, Display, PartialEq)]
675pub enum Instruction {
676    /// Opens regions.
677    #[serde(deserialize_with = "single_or_multiple_from", alias = "OpenRegion")]
678    OpenRegions(Vec<OpenRegion>),
679    /// Closes regions.
680    #[serde(deserialize_with = "single_or_multiple_from", alias = "CloseRegion")]
681    CloseRegions(Vec<RegionIdent>),
682    /// Upgrades regions.
683    #[serde(deserialize_with = "single_or_multiple_from", alias = "UpgradeRegion")]
684    UpgradeRegions(Vec<UpgradeRegion>),
685    #[serde(
686        deserialize_with = "single_or_multiple_from",
687        alias = "DowngradeRegion"
688    )]
689    /// Downgrades regions.
690    DowngradeRegions(Vec<DowngradeRegion>),
691    /// Invalidates batch cache.
692    InvalidateCaches(Vec<CacheIdent>),
693    /// Flushes regions.
694    FlushRegions(FlushRegions),
695    /// Gets file references for regions.
696    GetFileRefs(GetFileRefs),
697    /// Triggers garbage collection for a region.
698    GcRegions(GcRegions),
699    /// Temporary suspend serving reads or writes
700    Suspend,
701    /// Makes regions enter staging state.
702    EnterStagingRegions(Vec<EnterStagingRegion>),
703    /// Syncs regions.
704    SyncRegions(Vec<SyncRegion>),
705    /// Remaps manifests for a region.
706    RemapManifest(RemapManifest),
707
708    /// Applies staging manifests for a region.
709    ApplyStagingManifests(Vec<ApplyStagingManifest>),
710}
711
712impl Instruction {
713    /// Converts the instruction into a vector of [OpenRegion].
714    pub fn into_open_regions(self) -> Option<Vec<OpenRegion>> {
715        match self {
716            Self::OpenRegions(open_regions) => Some(open_regions),
717            _ => None,
718        }
719    }
720
721    /// Converts the instruction into a vector of [RegionIdent].
722    pub fn into_close_regions(self) -> Option<Vec<RegionIdent>> {
723        match self {
724            Self::CloseRegions(close_regions) => Some(close_regions),
725            _ => None,
726        }
727    }
728
729    /// Converts the instruction into a [FlushRegions].
730    pub fn into_flush_regions(self) -> Option<FlushRegions> {
731        match self {
732            Self::FlushRegions(flush_regions) => Some(flush_regions),
733            _ => None,
734        }
735    }
736
737    /// Converts the instruction into a [DowngradeRegion].
738    pub fn into_downgrade_regions(self) -> Option<Vec<DowngradeRegion>> {
739        match self {
740            Self::DowngradeRegions(downgrade_region) => Some(downgrade_region),
741            _ => None,
742        }
743    }
744
745    /// Converts the instruction into a [UpgradeRegion].
746    pub fn into_upgrade_regions(self) -> Option<Vec<UpgradeRegion>> {
747        match self {
748            Self::UpgradeRegions(upgrade_region) => Some(upgrade_region),
749            _ => None,
750        }
751    }
752
753    pub fn into_get_file_refs(self) -> Option<GetFileRefs> {
754        match self {
755            Self::GetFileRefs(get_file_refs) => Some(get_file_refs),
756            _ => None,
757        }
758    }
759
760    pub fn into_gc_regions(self) -> Option<GcRegions> {
761        match self {
762            Self::GcRegions(gc_regions) => Some(gc_regions),
763            _ => None,
764        }
765    }
766
767    pub fn into_enter_staging_regions(self) -> Option<Vec<EnterStagingRegion>> {
768        match self {
769            Self::EnterStagingRegions(enter_staging) => Some(enter_staging),
770            _ => None,
771        }
772    }
773
774    pub fn into_sync_regions(self) -> Option<Vec<SyncRegion>> {
775        match self {
776            Self::SyncRegions(sync_regions) => Some(sync_regions),
777            _ => None,
778        }
779    }
780}
781
782/// The reply of [UpgradeRegion].
783#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
784pub struct UpgradeRegionReply {
785    /// The [RegionId].
786    /// For compatibility, it is defaulted to [RegionId::new(0, 0)].
787    #[serde(default)]
788    pub region_id: RegionId,
789    /// Returns true if `last_entry_id` has been replayed to the latest.
790    pub ready: bool,
791    /// Indicates whether the region exists.
792    pub exists: bool,
793    /// Returns error if any.
794    pub error: Option<String>,
795}
796
797impl Display for UpgradeRegionReply {
798    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
799        write!(
800            f,
801            "(ready={}, exists={}, error={:?})",
802            self.ready, self.exists, self.error
803        )
804    }
805}
806
807#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
808pub struct DowngradeRegionsReply {
809    pub replies: Vec<DowngradeRegionReply>,
810}
811
812impl DowngradeRegionsReply {
813    pub fn new(replies: Vec<DowngradeRegionReply>) -> Self {
814        Self { replies }
815    }
816
817    pub fn single(reply: DowngradeRegionReply) -> Self {
818        Self::new(vec![reply])
819    }
820}
821
822#[derive(Deserialize)]
823#[serde(untagged)]
824enum DowngradeRegionsCompat {
825    Single(DowngradeRegionReply),
826    Multiple(DowngradeRegionsReply),
827}
828
829fn downgrade_regions_compat_from<'de, D>(deserializer: D) -> Result<DowngradeRegionsReply, D::Error>
830where
831    D: Deserializer<'de>,
832{
833    let helper = DowngradeRegionsCompat::deserialize(deserializer)?;
834    Ok(match helper {
835        DowngradeRegionsCompat::Single(x) => DowngradeRegionsReply::new(vec![x]),
836        DowngradeRegionsCompat::Multiple(reply) => reply,
837    })
838}
839
840#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
841pub struct UpgradeRegionsReply {
842    pub replies: Vec<UpgradeRegionReply>,
843}
844
845impl UpgradeRegionsReply {
846    pub fn new(replies: Vec<UpgradeRegionReply>) -> Self {
847        Self { replies }
848    }
849
850    pub fn single(reply: UpgradeRegionReply) -> Self {
851        Self::new(vec![reply])
852    }
853}
854
855#[derive(Deserialize)]
856#[serde(untagged)]
857enum UpgradeRegionsCompat {
858    Single(UpgradeRegionReply),
859    Multiple(UpgradeRegionsReply),
860}
861
862fn upgrade_regions_compat_from<'de, D>(deserializer: D) -> Result<UpgradeRegionsReply, D::Error>
863where
864    D: Deserializer<'de>,
865{
866    let helper = UpgradeRegionsCompat::deserialize(deserializer)?;
867    Ok(match helper {
868        UpgradeRegionsCompat::Single(x) => UpgradeRegionsReply::new(vec![x]),
869        UpgradeRegionsCompat::Multiple(reply) => reply,
870    })
871}
872
873#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
874pub struct EnterStagingRegionReply {
875    pub region_id: RegionId,
876    /// Returns true if the region has entered staging with the target directive.
877    pub ready: bool,
878    /// Indicates whether the region exists.
879    pub exists: bool,
880    /// Return error if any during the operation.
881    pub error: Option<String>,
882}
883
884#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
885pub struct EnterStagingRegionsReply {
886    pub replies: Vec<EnterStagingRegionReply>,
887}
888
889impl EnterStagingRegionsReply {
890    pub fn new(replies: Vec<EnterStagingRegionReply>) -> Self {
891        Self { replies }
892    }
893}
894
895/// Reply for a single region sync request.
896#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
897pub struct SyncRegionReply {
898    /// Region id of the synced region.
899    pub region_id: RegionId,
900    /// Returns true if the region is successfully synced and ready.
901    pub ready: bool,
902    /// Indicates whether the region exists.
903    pub exists: bool,
904    /// Return error message if any during the operation.
905    pub error: Option<String>,
906}
907
908/// Reply for a batch of region sync requests.
909#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
910pub struct SyncRegionsReply {
911    pub replies: Vec<SyncRegionReply>,
912}
913
914impl SyncRegionsReply {
915    pub fn new(replies: Vec<SyncRegionReply>) -> Self {
916        Self { replies }
917    }
918}
919
920#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
921pub struct RemapManifestReply {
922    /// Returns false if the region does not exist.
923    pub exists: bool,
924    /// A map from region IDs to their corresponding remapped manifest paths.
925    pub manifest_paths: HashMap<RegionId, String>,
926    /// Return error if any during the operation.
927    pub error: Option<String>,
928}
929
930impl Display for RemapManifestReply {
931    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
932        write!(
933            f,
934            "RemapManifestReply(manifest_paths={:?}, error={:?})",
935            self.manifest_paths, self.error
936        )
937    }
938}
939
940#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
941pub struct ApplyStagingManifestsReply {
942    pub replies: Vec<ApplyStagingManifestReply>,
943}
944
945impl ApplyStagingManifestsReply {
946    pub fn new(replies: Vec<ApplyStagingManifestReply>) -> Self {
947        Self { replies }
948    }
949}
950
951#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
952pub struct ApplyStagingManifestReply {
953    pub region_id: RegionId,
954    /// Returns true if the region is ready to serve reads and writes.
955    pub ready: bool,
956    /// Indicates whether the region exists.
957    pub exists: bool,
958    /// Return error if any during the operation.
959    pub error: Option<String>,
960}
961
962#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
963#[serde(tag = "type", rename_all = "snake_case")]
964pub enum InstructionReply {
965    #[serde(alias = "open_region")]
966    OpenRegions(SimpleReply),
967    #[serde(alias = "close_region")]
968    CloseRegions(SimpleReply),
969    #[serde(
970        deserialize_with = "upgrade_regions_compat_from",
971        alias = "upgrade_region"
972    )]
973    UpgradeRegions(UpgradeRegionsReply),
974    #[serde(
975        alias = "downgrade_region",
976        deserialize_with = "downgrade_regions_compat_from"
977    )]
978    DowngradeRegions(DowngradeRegionsReply),
979    FlushRegions(FlushRegionReply),
980    GetFileRefs(GetFileRefsReply),
981    GcRegions(GcRegionsReply),
982    EnterStagingRegions(EnterStagingRegionsReply),
983    SyncRegions(SyncRegionsReply),
984    RemapManifest(RemapManifestReply),
985    ApplyStagingManifests(ApplyStagingManifestsReply),
986}
987
988impl Display for InstructionReply {
989    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
990        match self {
991            Self::OpenRegions(reply) => write!(f, "InstructionReply::OpenRegions({})", reply),
992            Self::CloseRegions(reply) => write!(f, "InstructionReply::CloseRegions({})", reply),
993            Self::UpgradeRegions(reply) => {
994                write!(f, "InstructionReply::UpgradeRegions({:?})", reply.replies)
995            }
996            Self::DowngradeRegions(reply) => {
997                write!(f, "InstructionReply::DowngradeRegions({:?})", reply.replies)
998            }
999            Self::FlushRegions(reply) => write!(f, "InstructionReply::FlushRegions({})", reply),
1000            Self::GetFileRefs(reply) => write!(f, "InstructionReply::GetFileRefs({})", reply),
1001            Self::GcRegions(reply) => write!(f, "InstructionReply::GcRegion({})", reply),
1002            Self::EnterStagingRegions(reply) => {
1003                write!(
1004                    f,
1005                    "InstructionReply::EnterStagingRegions({:?})",
1006                    reply.replies
1007                )
1008            }
1009            Self::SyncRegions(reply) => {
1010                write!(f, "InstructionReply::SyncRegions({:?})", reply.replies)
1011            }
1012            Self::RemapManifest(reply) => write!(f, "InstructionReply::RemapManifest({})", reply),
1013            Self::ApplyStagingManifests(reply) => write!(
1014                f,
1015                "InstructionReply::ApplyStagingManifests({:?})",
1016                reply.replies
1017            ),
1018        }
1019    }
1020}
1021
1022#[cfg(any(test, feature = "testing"))]
1023impl InstructionReply {
1024    pub fn expect_close_regions_reply(self) -> SimpleReply {
1025        match self {
1026            Self::CloseRegions(reply) => reply,
1027            _ => panic!("Expected CloseRegions reply"),
1028        }
1029    }
1030
1031    pub fn expect_open_regions_reply(self) -> SimpleReply {
1032        match self {
1033            Self::OpenRegions(reply) => reply,
1034            _ => panic!("Expected OpenRegions reply"),
1035        }
1036    }
1037
1038    pub fn expect_upgrade_regions_reply(self) -> Vec<UpgradeRegionReply> {
1039        match self {
1040            Self::UpgradeRegions(reply) => reply.replies,
1041            _ => panic!("Expected UpgradeRegion reply"),
1042        }
1043    }
1044
1045    pub fn expect_downgrade_regions_reply(self) -> Vec<DowngradeRegionReply> {
1046        match self {
1047            Self::DowngradeRegions(reply) => reply.replies,
1048            _ => panic!("Expected DowngradeRegion reply"),
1049        }
1050    }
1051
1052    pub fn expect_flush_regions_reply(self) -> FlushRegionReply {
1053        match self {
1054            Self::FlushRegions(reply) => reply,
1055            _ => panic!("Expected FlushRegions reply"),
1056        }
1057    }
1058
1059    pub fn expect_enter_staging_regions_reply(self) -> Vec<EnterStagingRegionReply> {
1060        match self {
1061            Self::EnterStagingRegions(reply) => reply.replies,
1062            _ => panic!("Expected EnterStagingRegion reply"),
1063        }
1064    }
1065
1066    pub fn expect_sync_regions_reply(self) -> Vec<SyncRegionReply> {
1067        match self {
1068            Self::SyncRegions(reply) => reply.replies,
1069            _ => panic!("Expected SyncRegion reply"),
1070        }
1071    }
1072
1073    pub fn expect_remap_manifest_reply(self) -> RemapManifestReply {
1074        match self {
1075            Self::RemapManifest(reply) => reply,
1076            _ => panic!("Expected RemapManifest reply"),
1077        }
1078    }
1079
1080    pub fn expect_apply_staging_manifests_reply(self) -> Vec<ApplyStagingManifestReply> {
1081        match self {
1082            Self::ApplyStagingManifests(reply) => reply.replies,
1083            _ => panic!("Expected ApplyStagingManifest reply"),
1084        }
1085    }
1086}
1087
1088#[cfg(test)]
1089mod tests {
1090    use std::collections::HashSet;
1091
1092    use store_api::storage::{FileId, FileRef};
1093
1094    use super::*;
1095
1096    #[test]
1097    fn test_serialize_instruction() {
1098        let open_region = Instruction::OpenRegions(vec![OpenRegion::new(
1099            RegionIdent {
1100                datanode_id: 2,
1101                table_id: 1024,
1102                region_number: 1,
1103                engine: "mito2".to_string(),
1104            },
1105            "test/foo",
1106            HashMap::new(),
1107            HashMap::new(),
1108            false,
1109        )]);
1110
1111        let serialized = serde_json::to_string(&open_region).unwrap();
1112        assert_eq!(
1113            r#"{"OpenRegions":[{"region_ident":{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"},"region_storage_path":"test/foo","region_options":{},"region_wal_options":{},"skip_wal_replay":false}]}"#,
1114            serialized
1115        );
1116
1117        let close_region = Instruction::CloseRegions(vec![RegionIdent {
1118            datanode_id: 2,
1119            table_id: 1024,
1120            region_number: 1,
1121            engine: "mito2".to_string(),
1122        }]);
1123
1124        let serialized = serde_json::to_string(&close_region).unwrap();
1125        assert_eq!(
1126            r#"{"CloseRegions":[{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"}]}"#,
1127            serialized
1128        );
1129
1130        let upgrade_region = Instruction::UpgradeRegions(vec![UpgradeRegion {
1131            region_id: RegionId::new(1024, 1),
1132            last_entry_id: None,
1133            metadata_last_entry_id: None,
1134            replay_timeout: Duration::from_millis(1000),
1135            location_id: None,
1136            replay_entry_id: None,
1137            metadata_replay_entry_id: None,
1138        }]);
1139
1140        let serialized = serde_json::to_string(&upgrade_region).unwrap();
1141        assert_eq!(
1142            r#"{"UpgradeRegions":[{"region_id":4398046511105,"last_entry_id":null,"metadata_last_entry_id":null,"replay_timeout":"1s","location_id":null}]}"#,
1143            serialized
1144        );
1145    }
1146
1147    #[test]
1148    fn test_serialize_instruction_reply() {
1149        let downgrade_region_reply = InstructionReply::DowngradeRegions(
1150            DowngradeRegionsReply::single(DowngradeRegionReply {
1151                region_id: RegionId::new(1024, 1),
1152                last_entry_id: None,
1153                metadata_last_entry_id: None,
1154                exists: true,
1155                error: None,
1156            }),
1157        );
1158
1159        let serialized = serde_json::to_string(&downgrade_region_reply).unwrap();
1160        assert_eq!(
1161            r#"{"type":"downgrade_regions","replies":[{"region_id":4398046511105,"last_entry_id":null,"metadata_last_entry_id":null,"exists":true,"error":null}]}"#,
1162            serialized
1163        );
1164
1165        let upgrade_region_reply =
1166            InstructionReply::UpgradeRegions(UpgradeRegionsReply::single(UpgradeRegionReply {
1167                region_id: RegionId::new(1024, 1),
1168                ready: true,
1169                exists: true,
1170                error: None,
1171            }));
1172        let serialized = serde_json::to_string(&upgrade_region_reply).unwrap();
1173        assert_eq!(
1174            r#"{"type":"upgrade_regions","replies":[{"region_id":4398046511105,"ready":true,"exists":true,"error":null}]}"#,
1175            serialized
1176        );
1177    }
1178
1179    #[test]
1180    fn test_deserialize_instruction() {
1181        // legacy open region instruction
1182        let open_region_instruction = r#"{"OpenRegion":{"region_ident":{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"},"region_storage_path":"test/foo","region_options":{},"region_wal_options":{},"skip_wal_replay":false}}"#;
1183        let open_region_instruction: Instruction =
1184            serde_json::from_str(open_region_instruction).unwrap();
1185        let open_region = Instruction::OpenRegions(vec![OpenRegion::new(
1186            RegionIdent {
1187                datanode_id: 2,
1188                table_id: 1024,
1189                region_number: 1,
1190                engine: "mito2".to_string(),
1191            },
1192            "test/foo",
1193            HashMap::new(),
1194            HashMap::new(),
1195            false,
1196        )]);
1197        assert_eq!(open_region_instruction, open_region);
1198
1199        // legacy close region instruction
1200        let close_region_instruction = r#"{"CloseRegion":{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"}}"#;
1201        let close_region_instruction: Instruction =
1202            serde_json::from_str(close_region_instruction).unwrap();
1203        let close_region = Instruction::CloseRegions(vec![RegionIdent {
1204            datanode_id: 2,
1205            table_id: 1024,
1206            region_number: 1,
1207            engine: "mito2".to_string(),
1208        }]);
1209        assert_eq!(close_region_instruction, close_region);
1210
1211        // legacy downgrade region instruction
1212        let downgrade_region_instruction = r#"{"DowngradeRegions":{"region_id":4398046511105,"flush_timeout":{"secs":1,"nanos":0}}}"#;
1213        let downgrade_region_instruction: Instruction =
1214            serde_json::from_str(downgrade_region_instruction).unwrap();
1215        let downgrade_region = Instruction::DowngradeRegions(vec![DowngradeRegion {
1216            region_id: RegionId::new(1024, 1),
1217            flush_timeout: Some(Duration::from_millis(1000)),
1218        }]);
1219        assert_eq!(downgrade_region_instruction, downgrade_region);
1220
1221        // legacy upgrade region instruction
1222        let upgrade_region_instruction = r#"{"UpgradeRegion":{"region_id":4398046511105,"last_entry_id":null,"metadata_last_entry_id":null,"replay_timeout":"1s","location_id":null,"replay_entry_id":null,"metadata_replay_entry_id":null}}"#;
1223        let upgrade_region_instruction: Instruction =
1224            serde_json::from_str(upgrade_region_instruction).unwrap();
1225        let upgrade_region = Instruction::UpgradeRegions(vec![UpgradeRegion {
1226            region_id: RegionId::new(1024, 1),
1227            last_entry_id: None,
1228            metadata_last_entry_id: None,
1229            replay_timeout: Duration::from_millis(1000),
1230            location_id: None,
1231            replay_entry_id: None,
1232            metadata_replay_entry_id: None,
1233        }]);
1234        assert_eq!(upgrade_region_instruction, upgrade_region);
1235    }
1236
1237    #[test]
1238    fn test_deserialize_instruction_reply() {
1239        // legacy close region reply
1240        let close_region_instruction_reply =
1241            r#"{"result":true,"error":null,"type":"close_region"}"#;
1242        let close_region_instruction_reply: InstructionReply =
1243            serde_json::from_str(close_region_instruction_reply).unwrap();
1244        let close_region_reply = InstructionReply::CloseRegions(SimpleReply {
1245            result: true,
1246            error: None,
1247        });
1248        assert_eq!(close_region_instruction_reply, close_region_reply);
1249
1250        // legacy open region reply
1251        let open_region_instruction_reply = r#"{"result":true,"error":null,"type":"open_region"}"#;
1252        let open_region_instruction_reply: InstructionReply =
1253            serde_json::from_str(open_region_instruction_reply).unwrap();
1254        let open_region_reply = InstructionReply::OpenRegions(SimpleReply {
1255            result: true,
1256            error: None,
1257        });
1258        assert_eq!(open_region_instruction_reply, open_region_reply);
1259
1260        // legacy downgrade region reply
1261        let downgrade_region_instruction_reply = r#"{"region_id":4398046511105,"last_entry_id":null,"metadata_last_entry_id":null,"exists":true,"error":null,"type":"downgrade_region"}"#;
1262        let downgrade_region_instruction_reply: InstructionReply =
1263            serde_json::from_str(downgrade_region_instruction_reply).unwrap();
1264        let downgrade_region_reply = InstructionReply::DowngradeRegions(
1265            DowngradeRegionsReply::single(DowngradeRegionReply {
1266                region_id: RegionId::new(1024, 1),
1267                last_entry_id: None,
1268                metadata_last_entry_id: None,
1269                exists: true,
1270                error: None,
1271            }),
1272        );
1273        assert_eq!(downgrade_region_instruction_reply, downgrade_region_reply);
1274
1275        // legacy upgrade region reply
1276        let upgrade_region_instruction_reply = r#"{"region_id":4398046511105,"ready":true,"exists":true,"error":null,"type":"upgrade_region"}"#;
1277        let upgrade_region_instruction_reply: InstructionReply =
1278            serde_json::from_str(upgrade_region_instruction_reply).unwrap();
1279        let upgrade_region_reply =
1280            InstructionReply::UpgradeRegions(UpgradeRegionsReply::single(UpgradeRegionReply {
1281                region_id: RegionId::new(1024, 1),
1282                ready: true,
1283                exists: true,
1284                error: None,
1285            }));
1286        assert_eq!(upgrade_region_instruction_reply, upgrade_region_reply);
1287    }
1288
1289    #[test]
1290    fn test_enter_staging_partition_rule_compatibility() {
1291        let legacy = r#"{"region_id":4398046511105,"partition_expr":"{\"Expr\":{\"lhs\":{\"Column\":\"x\"},\"op\":\"GtEq\",\"rhs\":{\"Value\":{\"Int32\":0}}}}"}"#;
1292        let enter: EnterStagingRegion = serde_json::from_str(legacy).unwrap();
1293        assert_eq!(enter.region_id, RegionId::new(1024, 1));
1294        assert_eq!(
1295            enter.partition_directive,
1296            StagingPartitionDirective::UpdatePartitionExpr(
1297                "{\"Expr\":{\"lhs\":{\"Column\":\"x\"},\"op\":\"GtEq\",\"rhs\":{\"Value\":{\"Int32\":0}}}}"
1298                    .to_string()
1299            )
1300        );
1301
1302        let serialized = serde_json::to_string(&enter).unwrap();
1303        assert!(serialized.contains("\"partition_directive\":\""));
1304        assert!(!serialized.contains("partition_expr"));
1305
1306        let reject = r#"{"region_id":4398046511105,"partition_expr":{"type":"reject_all_writes"}}"#;
1307        let enter: EnterStagingRegion = serde_json::from_str(reject).unwrap();
1308        assert_eq!(
1309            enter.partition_directive,
1310            StagingPartitionDirective::RejectAllWrites
1311        );
1312    }
1313
1314    #[derive(Debug, Clone, Serialize, Deserialize)]
1315    struct LegacyOpenRegion {
1316        region_ident: RegionIdent,
1317        region_storage_path: String,
1318        region_options: HashMap<String, String>,
1319    }
1320
1321    #[test]
1322    fn test_compatible_serialize_open_region() {
1323        let region_ident = RegionIdent {
1324            datanode_id: 2,
1325            table_id: 1024,
1326            region_number: 1,
1327            engine: "mito2".to_string(),
1328        };
1329        let region_storage_path = "test/foo".to_string();
1330        let region_options = HashMap::from([
1331            ("a".to_string(), "aa".to_string()),
1332            ("b".to_string(), "bb".to_string()),
1333        ]);
1334
1335        // Serialize a legacy OpenRegion.
1336        let legacy_open_region = LegacyOpenRegion {
1337            region_ident: region_ident.clone(),
1338            region_storage_path: region_storage_path.clone(),
1339            region_options: region_options.clone(),
1340        };
1341        let serialized = serde_json::to_string(&legacy_open_region).unwrap();
1342
1343        // Deserialize to OpenRegion.
1344        let deserialized = serde_json::from_str(&serialized).unwrap();
1345        let expected = OpenRegion {
1346            region_ident,
1347            region_storage_path,
1348            region_options,
1349            region_wal_options: HashMap::new(),
1350            skip_wal_replay: false,
1351        };
1352        assert_eq!(expected, deserialized);
1353    }
1354
1355    #[test]
1356    fn test_flush_regions_creation() {
1357        let region_id = RegionId::new(1024, 1);
1358
1359        // Single region sync flush
1360        let single_sync = FlushRegions::sync_single(region_id);
1361        assert_eq!(single_sync.region_ids, vec![region_id]);
1362        assert_eq!(single_sync.strategy, FlushStrategy::Sync);
1363        assert!(!single_sync.is_hint());
1364        assert!(single_sync.is_sync());
1365        assert_eq!(single_sync.error_strategy, FlushErrorStrategy::FailFast);
1366        assert!(single_sync.is_single_region());
1367        assert_eq!(single_sync.single_region_id(), Some(region_id));
1368
1369        // Batch async flush (hint)
1370        let region_ids = vec![RegionId::new(1024, 1), RegionId::new(1024, 2)];
1371        let batch_async = FlushRegions::async_batch(region_ids.clone());
1372        assert_eq!(batch_async.region_ids, region_ids);
1373        assert_eq!(batch_async.strategy, FlushStrategy::Async);
1374        assert!(batch_async.is_hint());
1375        assert!(!batch_async.is_sync());
1376        assert_eq!(batch_async.error_strategy, FlushErrorStrategy::TryAll);
1377        assert!(!batch_async.is_single_region());
1378        assert_eq!(batch_async.single_region_id(), None);
1379
1380        // Batch sync flush
1381        let batch_sync = FlushRegions::sync_batch(region_ids.clone(), FlushErrorStrategy::FailFast);
1382        assert_eq!(batch_sync.region_ids, region_ids);
1383        assert_eq!(batch_sync.strategy, FlushStrategy::Sync);
1384        assert!(!batch_sync.is_hint());
1385        assert!(batch_sync.is_sync());
1386        assert_eq!(batch_sync.error_strategy, FlushErrorStrategy::FailFast);
1387    }
1388
1389    #[test]
1390    fn test_flush_regions_conversion() {
1391        let region_id = RegionId::new(1024, 1);
1392
1393        let from_region_id: FlushRegions = region_id.into();
1394        assert_eq!(from_region_id.region_ids, vec![region_id]);
1395        assert_eq!(from_region_id.strategy, FlushStrategy::Sync);
1396        assert!(!from_region_id.is_hint());
1397        assert!(from_region_id.is_sync());
1398
1399        // Test default construction
1400        let flush_regions = FlushRegions {
1401            region_ids: vec![region_id],
1402            strategy: FlushStrategy::Async,
1403            error_strategy: FlushErrorStrategy::TryAll,
1404        };
1405        assert_eq!(flush_regions.region_ids, vec![region_id]);
1406        assert_eq!(flush_regions.strategy, FlushStrategy::Async);
1407        assert!(flush_regions.is_hint());
1408        assert!(!flush_regions.is_sync());
1409    }
1410
1411    #[test]
1412    fn test_flush_region_reply() {
1413        let region_id = RegionId::new(1024, 1);
1414
1415        // Successful single region reply
1416        let success_reply = FlushRegionReply::success_single(region_id);
1417        assert!(success_reply.overall_success);
1418        assert_eq!(success_reply.results.len(), 1);
1419        assert_eq!(success_reply.results[0].0, region_id);
1420        assert!(success_reply.results[0].1.is_ok());
1421
1422        // Failed single region reply
1423        let error_reply = FlushRegionReply::error_single(region_id, "test error".to_string());
1424        assert!(!error_reply.overall_success);
1425        assert_eq!(error_reply.results.len(), 1);
1426        assert_eq!(error_reply.results[0].0, region_id);
1427        assert!(error_reply.results[0].1.is_err());
1428
1429        // Batch reply
1430        let region_id2 = RegionId::new(1024, 2);
1431        let results = vec![
1432            (region_id, Ok(())),
1433            (region_id2, Err("flush failed".to_string())),
1434        ];
1435        let batch_reply = FlushRegionReply::from_results(results);
1436        assert!(!batch_reply.overall_success);
1437        assert_eq!(batch_reply.results.len(), 2);
1438
1439        // Conversion to SimpleReply
1440        let simple_reply = batch_reply.to_simple_reply();
1441        assert!(!simple_reply.result);
1442        assert!(simple_reply.error.is_some());
1443        assert!(simple_reply.error.unwrap().contains("flush failed"));
1444    }
1445
1446    #[test]
1447    fn test_serialize_flush_regions_instruction() {
1448        let region_id = RegionId::new(1024, 1);
1449        let flush_regions = FlushRegions::sync_single(region_id);
1450        let instruction = Instruction::FlushRegions(flush_regions.clone());
1451
1452        let serialized = serde_json::to_string(&instruction).unwrap();
1453        let deserialized: Instruction = serde_json::from_str(&serialized).unwrap();
1454
1455        match deserialized {
1456            Instruction::FlushRegions(fr) => {
1457                assert_eq!(fr.region_ids, vec![region_id]);
1458                assert_eq!(fr.strategy, FlushStrategy::Sync);
1459                assert_eq!(fr.error_strategy, FlushErrorStrategy::FailFast);
1460            }
1461            _ => panic!("Expected FlushRegions instruction"),
1462        }
1463    }
1464
1465    #[test]
1466    fn test_serialize_flush_regions_batch_instruction() {
1467        let region_ids = vec![RegionId::new(1024, 1), RegionId::new(1024, 2)];
1468        let flush_regions =
1469            FlushRegions::sync_batch(region_ids.clone(), FlushErrorStrategy::TryAll);
1470        let instruction = Instruction::FlushRegions(flush_regions);
1471
1472        let serialized = serde_json::to_string(&instruction).unwrap();
1473        let deserialized: Instruction = serde_json::from_str(&serialized).unwrap();
1474
1475        match deserialized {
1476            Instruction::FlushRegions(fr) => {
1477                assert_eq!(fr.region_ids, region_ids);
1478                assert_eq!(fr.strategy, FlushStrategy::Sync);
1479                assert!(!fr.is_hint());
1480                assert!(fr.is_sync());
1481                assert_eq!(fr.error_strategy, FlushErrorStrategy::TryAll);
1482            }
1483            _ => panic!("Expected FlushRegions instruction"),
1484        }
1485    }
1486
1487    #[test]
1488    fn test_serialize_get_file_refs_instruction_reply() {
1489        let mut manifest = FileRefsManifest::default();
1490        let r0 = RegionId::new(1024, 1);
1491        let r1 = RegionId::new(1024, 2);
1492        manifest.file_refs.insert(
1493            r0,
1494            HashSet::from([FileRef::new(r0, FileId::random(), None)]),
1495        );
1496        manifest.file_refs.insert(
1497            r1,
1498            HashSet::from([FileRef::new(r1, FileId::random(), None)]),
1499        );
1500        manifest.manifest_version.insert(r0, 10);
1501        manifest.manifest_version.insert(r1, 20);
1502
1503        let instruction_reply = InstructionReply::GetFileRefs(GetFileRefsReply {
1504            file_refs_manifest: manifest,
1505            success: true,
1506            error: None,
1507        });
1508
1509        let serialized = serde_json::to_string(&instruction_reply).unwrap();
1510        let deserialized = serde_json::from_str(&serialized).unwrap();
1511
1512        assert_eq!(instruction_reply, deserialized);
1513    }
1514}