Skip to main content

common_meta/
instruction.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::fmt::{Display, Formatter};
17use std::time::Duration;
18
19use serde::{Deserialize, Deserializer, Serialize, Serializer};
20use store_api::region_engine::SyncRegionFromRequest;
21use store_api::region_request::{RegionFlushReason, RegionRequirements};
22use store_api::storage::{FileRefsManifest, GcReport, RegionId, RegionNumber};
23use strum::Display;
24use table::metadata::TableId;
25use table::table_name::TableName;
26
27use crate::flow_name::FlowName;
28use crate::key::schema_name::SchemaName;
29use crate::key::{FlowId, FlowPartitionId};
30use crate::peer::Peer;
31use crate::wal_provider::{RegionWalOptions, region_wal_options_serde};
32use crate::{DatanodeId, FlownodeId};
33
34#[derive(Eq, Hash, PartialEq, Clone, Debug, Serialize, Deserialize)]
35pub struct RegionIdent {
36    pub datanode_id: DatanodeId,
37    pub table_id: TableId,
38    pub region_number: RegionNumber,
39    pub engine: String,
40}
41
42impl RegionIdent {
43    pub fn get_region_id(&self) -> RegionId {
44        RegionId::new(self.table_id, self.region_number)
45    }
46}
47
48impl Display for RegionIdent {
49    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
50        write!(
51            f,
52            "RegionIdent(datanode_id='{}', table_id={}, region_number={}, engine = {})",
53            self.datanode_id, self.table_id, self.region_number, self.engine
54        )
55    }
56}
57
58/// The result of downgrade leader region.
59#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
60pub struct DowngradeRegionReply {
61    /// The [RegionId].
62    /// For compatibility, it is defaulted to [RegionId::new(0, 0)].
63    #[serde(default)]
64    pub region_id: RegionId,
65    /// Returns the `last_entry_id` if available.
66    pub last_entry_id: Option<u64>,
67    /// Returns the `metadata_last_entry_id` if available (Only available for metric engine).
68    pub metadata_last_entry_id: Option<u64>,
69    /// Indicates whether the region exists.
70    pub exists: bool,
71    /// Return error if any during the operation.
72    pub error: Option<String>,
73}
74
75impl Display for DowngradeRegionReply {
76    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
77        write!(
78            f,
79            "(last_entry_id={:?}, exists={}, error={:?})",
80            self.last_entry_id, self.exists, self.error
81        )
82    }
83}
84
85#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
86pub struct SimpleReply {
87    pub result: bool,
88    pub error: Option<String>,
89}
90
91/// Reply for flush region operations with support for batch results.
92#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
93pub struct FlushRegionReply {
94    /// Results for each region that was attempted to be flushed.
95    /// For single region flushes, this will contain one result.
96    /// For batch flushes, this contains results for all attempted regions.
97    pub results: Vec<(RegionId, Result<(), String>)>,
98    /// Overall success: true if all regions were flushed successfully.
99    pub overall_success: bool,
100}
101
102impl FlushRegionReply {
103    /// Create a successful single region reply.
104    pub fn success_single(region_id: RegionId) -> Self {
105        Self {
106            results: vec![(region_id, Ok(()))],
107            overall_success: true,
108        }
109    }
110
111    /// Create a failed single region reply.
112    pub fn error_single(region_id: RegionId, error: String) -> Self {
113        Self {
114            results: vec![(region_id, Err(error))],
115            overall_success: false,
116        }
117    }
118
119    /// Create a batch reply from individual results.
120    pub fn from_results(results: Vec<(RegionId, Result<(), String>)>) -> Self {
121        let overall_success = results.iter().all(|(_, result)| result.is_ok());
122        Self {
123            results,
124            overall_success,
125        }
126    }
127
128    /// Convert to SimpleReply for backward compatibility.
129    pub fn to_simple_reply(&self) -> SimpleReply {
130        if self.overall_success {
131            SimpleReply {
132                result: true,
133                error: None,
134            }
135        } else {
136            let errors: Vec<String> = self
137                .results
138                .iter()
139                .filter_map(|(region_id, result)| {
140                    result
141                        .as_ref()
142                        .err()
143                        .map(|err| format!("{}: {}", region_id, err))
144                })
145                .collect();
146            SimpleReply {
147                result: false,
148                error: Some(errors.join("; ")),
149            }
150        }
151    }
152}
153
154impl Display for SimpleReply {
155    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
156        write!(f, "(result={}, error={:?})", self.result, self.error)
157    }
158}
159
160impl Display for FlushRegionReply {
161    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
162        let results_str = self
163            .results
164            .iter()
165            .map(|(region_id, result)| match result {
166                Ok(()) => format!("{}:OK", region_id),
167                Err(err) => format!("{}:ERR({})", region_id, err),
168            })
169            .collect::<Vec<_>>()
170            .join(", ");
171        write!(
172            f,
173            "(overall_success={}, results=[{}])",
174            self.overall_success, results_str
175        )
176    }
177}
178
179impl Display for OpenRegion {
180    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
181        write!(
182            f,
183            "OpenRegion(region_ident={}, region_storage_path={}, reason={:?})",
184            self.region_ident, self.region_storage_path, self.reason
185        )
186    }
187}
188
189/// The reason why an open region instruction is triggered.
190#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
191pub enum OpenRegionReason {
192    /// Open triggered before region migration.
193    RegionMigration,
194    /// Open triggered by region failover.
195    RegionFailover,
196    /// Open triggered when adding a follower region.
197    #[cfg(feature = "enterprise")]
198    RegionFollower,
199}
200
201#[serde_with::serde_as]
202#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
203pub struct OpenRegion {
204    pub region_ident: RegionIdent,
205    pub region_storage_path: String,
206    pub region_options: HashMap<String, String>,
207    #[serde(default)]
208    #[serde(with = "region_wal_options_serde")]
209    pub region_wal_options: RegionWalOptions,
210    #[serde(default)]
211    pub skip_wal_replay: bool,
212    #[serde(default, skip_serializing_if = "Option::is_none")]
213    pub reason: Option<OpenRegionReason>,
214    #[serde(default)]
215    pub requirements: RegionRequirements,
216}
217
218impl OpenRegion {
219    pub fn new(
220        region_ident: RegionIdent,
221        path: &str,
222        region_options: HashMap<String, String>,
223        region_wal_options: RegionWalOptions,
224        skip_wal_replay: bool,
225        reason: Option<OpenRegionReason>,
226        requirements: RegionRequirements,
227    ) -> Self {
228        Self {
229            region_ident,
230            region_storage_path: path.to_string(),
231            region_options,
232            region_wal_options,
233            skip_wal_replay,
234            reason,
235            requirements,
236        }
237    }
238}
239
240/// The instruction of downgrading leader region.
241#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
242pub struct DowngradeRegion {
243    /// The [RegionId].
244    pub region_id: RegionId,
245    /// The timeout of waiting for flush the region.
246    ///
247    /// `None` stands for don't flush before downgrading the region.
248    #[serde(default)]
249    pub flush_timeout: Option<Duration>,
250}
251
252impl Display for DowngradeRegion {
253    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
254        write!(
255            f,
256            "DowngradeRegion(region_id={}, flush_timeout={:?})",
257            self.region_id, self.flush_timeout,
258        )
259    }
260}
261
262/// Upgrades a follower region to leader region.
263#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
264pub struct UpgradeRegion {
265    /// The [RegionId].
266    pub region_id: RegionId,
267    /// The `last_entry_id` of old leader region.
268    pub last_entry_id: Option<u64>,
269    /// The `last_entry_id` of old leader metadata region (Only used for metric engine).
270    pub metadata_last_entry_id: Option<u64>,
271    /// The timeout of waiting for a wal replay.
272    ///
273    /// `None` stands for no wait,
274    /// it's helpful to verify whether the leader region is ready.
275    #[serde(with = "humantime_serde")]
276    pub replay_timeout: Duration,
277    /// The hint for replaying memtable.
278    #[serde(default)]
279    pub location_id: Option<u64>,
280    #[serde(default, skip_serializing_if = "Option::is_none")]
281    pub replay_entry_id: Option<u64>,
282    #[serde(default, skip_serializing_if = "Option::is_none")]
283    pub metadata_replay_entry_id: Option<u64>,
284}
285
286impl UpgradeRegion {
287    /// Sets the replay entry id.
288    pub fn with_replay_entry_id(mut self, replay_entry_id: Option<u64>) -> Self {
289        self.replay_entry_id = replay_entry_id;
290        self
291    }
292
293    /// Sets the metadata replay entry id.
294    pub fn with_metadata_replay_entry_id(mut self, metadata_replay_entry_id: Option<u64>) -> Self {
295        self.metadata_replay_entry_id = metadata_replay_entry_id;
296        self
297    }
298}
299
300#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
301/// The identifier of cache.
302pub enum CacheIdent {
303    FlowId(FlowId),
304    /// Indicate change of address of flownode.
305    FlowNodeAddressChange(u64),
306    FlowName(FlowName),
307    TableId(TableId),
308    TableName(TableName),
309    SchemaName(SchemaName),
310    CreateFlow(CreateFlow),
311    DropFlow(DropFlow),
312    /// Indicate change of user metadata.
313    User(UserCacheIdent),
314}
315
316#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
317pub struct UserCacheIdent {
318    pub catalog: String,
319    pub username: String,
320}
321
322#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
323pub struct CreateFlow {
324    /// The unique identifier for the flow.
325    pub flow_id: FlowId,
326    pub source_table_ids: Vec<TableId>,
327    /// Mapping of flow partition to peer information
328    pub partition_to_peer_mapping: Vec<(FlowPartitionId, Peer)>,
329}
330
331#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
332pub struct DropFlow {
333    pub flow_id: FlowId,
334    pub source_table_ids: Vec<TableId>,
335    /// Mapping of flow partition to flownode id
336    pub flow_part2node_id: Vec<(FlowPartitionId, FlownodeId)>,
337}
338
339/// Strategy for executing flush operations.
340#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
341pub enum FlushStrategy {
342    /// Synchronous operation that waits for completion and expects a reply
343    #[default]
344    Sync,
345    /// Asynchronous hint operation (fire-and-forget, no reply expected)
346    Async,
347}
348
349/// Error handling strategy for batch flush operations.
350#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
351pub enum FlushErrorStrategy {
352    /// Abort on first error (fail-fast)
353    #[default]
354    FailFast,
355    /// Attempt to flush all regions and collect all errors
356    TryAll,
357}
358
359/// Unified flush instruction supporting both single and batch operations
360/// with configurable execution strategies and error handling.
361#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
362pub struct FlushRegions {
363    /// List of region IDs to flush. Can contain a single region or multiple regions.
364    pub region_ids: Vec<RegionId>,
365    /// Execution strategy: Sync (expects reply) or Async (fire-and-forget hint).
366    #[serde(default)]
367    pub strategy: FlushStrategy,
368    /// Error handling strategy for batch operations (only applies when multiple regions and sync strategy).
369    #[serde(default)]
370    pub error_strategy: FlushErrorStrategy,
371    /// The source that triggered this flush.
372    #[serde(default, skip_serializing_if = "Option::is_none")]
373    pub reason: Option<RegionFlushReason>,
374}
375
376impl Display for FlushRegions {
377    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
378        write!(
379            f,
380            "FlushRegions(region_ids={:?}, strategy={:?}, error_strategy={:?}, reason={:?})",
381            self.region_ids, self.strategy, self.error_strategy, self.reason
382        )
383    }
384}
385
386impl FlushRegions {
387    /// Create synchronous single-region flush
388    pub fn sync_single(region_id: RegionId) -> Self {
389        Self {
390            region_ids: vec![region_id],
391            strategy: FlushStrategy::Sync,
392            error_strategy: FlushErrorStrategy::FailFast,
393            reason: None,
394        }
395    }
396
397    /// Create asynchronous batch flush (fire-and-forget)
398    pub fn async_batch(region_ids: Vec<RegionId>) -> Self {
399        Self {
400            region_ids,
401            strategy: FlushStrategy::Async,
402            error_strategy: FlushErrorStrategy::TryAll,
403            reason: None,
404        }
405    }
406
407    /// Create synchronous batch flush with error strategy
408    pub fn sync_batch(region_ids: Vec<RegionId>, error_strategy: FlushErrorStrategy) -> Self {
409        Self {
410            region_ids,
411            strategy: FlushStrategy::Sync,
412            error_strategy,
413            reason: None,
414        }
415    }
416
417    pub fn with_reason(mut self, reason: RegionFlushReason) -> Self {
418        self.reason = Some(reason);
419        self
420    }
421
422    /// Check if this is a single region flush.
423    pub fn is_single_region(&self) -> bool {
424        self.region_ids.len() == 1
425    }
426
427    /// Get the single region ID if this is a single region flush.
428    pub fn single_region_id(&self) -> Option<RegionId> {
429        if self.is_single_region() {
430            self.region_ids.first().copied()
431        } else {
432            None
433        }
434    }
435
436    /// Check if this is a hint (asynchronous) operation.
437    pub fn is_hint(&self) -> bool {
438        matches!(self.strategy, FlushStrategy::Async)
439    }
440
441    /// Check if this is a synchronous operation.
442    pub fn is_sync(&self) -> bool {
443        matches!(self.strategy, FlushStrategy::Sync)
444    }
445}
446
447impl From<RegionId> for FlushRegions {
448    fn from(region_id: RegionId) -> Self {
449        Self::sync_single(region_id)
450    }
451}
452
453#[derive(Debug, Deserialize)]
454#[serde(untagged)]
455enum SingleOrMultiple<T> {
456    Single(T),
457    Multiple(Vec<T>),
458}
459
460fn single_or_multiple_from<'de, D, T>(deserializer: D) -> Result<Vec<T>, D::Error>
461where
462    D: Deserializer<'de>,
463    T: Deserialize<'de>,
464{
465    let helper = SingleOrMultiple::<T>::deserialize(deserializer)?;
466    Ok(match helper {
467        SingleOrMultiple::Single(x) => vec![x],
468        SingleOrMultiple::Multiple(xs) => xs,
469    })
470}
471
472/// Instruction to get file references for specified regions.
473#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
474pub struct GetFileRefs {
475    /// List of region IDs to get file references from active FileHandles (in-memory).
476    pub query_regions: Vec<RegionId>,
477    /// Mapping from the src region IDs (whose file references to look for) to
478    /// the dst region IDs (where to read the manifests).
479    /// Key: The source region IDs (where files originally came from).
480    /// Value: The set of destination region IDs (whose manifests need to be read).
481    pub related_regions: HashMap<RegionId, HashSet<RegionId>>,
482}
483
484impl Display for GetFileRefs {
485    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
486        write!(f, "GetFileRefs(region_ids={:?})", self.query_regions)
487    }
488}
489
490/// Instruction to trigger garbage collection for a region.
491#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
492pub struct GcRegions {
493    /// The region ID to perform GC on, only regions that are currently on the given datanode can be garbage collected, regions not on the datanode will report errors.
494    pub regions: Vec<RegionId>,
495    /// The file references manifest containing temporary file references.
496    pub file_refs_manifest: FileRefsManifest,
497    /// Whether to perform a full file listing to find orphan files.
498    pub full_file_listing: bool,
499}
500
501impl Display for GcRegions {
502    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
503        write!(
504            f,
505            "GcRegion(regions={:?}, file_refs_count={}, full_file_listing={})",
506            self.regions,
507            self.file_refs_manifest.file_refs.len(),
508            self.full_file_listing
509        )
510    }
511}
512
513/// Reply for GetFileRefs instruction.
514#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
515pub struct GetFileRefsReply {
516    /// The file references manifest.
517    pub file_refs_manifest: FileRefsManifest,
518    /// Whether the operation was successful.
519    pub success: bool,
520    /// Error message if any.
521    pub error: Option<String>,
522}
523
524impl Display for GetFileRefsReply {
525    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
526        write!(
527            f,
528            "GetFileRefsReply(success={}, file_refs_count={}, error={:?})",
529            self.success,
530            self.file_refs_manifest.file_refs.len(),
531            self.error
532        )
533    }
534}
535
536/// Reply for GC instruction.
537#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
538pub struct GcRegionsReply {
539    pub result: Result<GcReport, String>,
540}
541
542impl Display for GcRegionsReply {
543    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
544        write!(
545            f,
546            "GcReply(result={})",
547            match &self.result {
548                Ok(report) => format!(
549                    "GcReport(deleted_files_count={}, need_retry_regions_count={})",
550                    report.deleted_files.len(),
551                    report.need_retry_regions.len()
552                ),
553                Err(err) => format!("Err({})", err),
554            }
555        )
556    }
557}
558
559#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
560pub struct EnterStagingRegion {
561    pub region_id: RegionId,
562    #[serde(
563        alias = "partition_expr",
564        deserialize_with = "deserialize_enter_staging_partition_directive",
565        serialize_with = "serialize_enter_staging_partition_directive"
566    )]
567    pub partition_directive: StagingPartitionDirective,
568}
569
570impl Display for EnterStagingRegion {
571    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
572        write!(
573            f,
574            "EnterStagingRegion(region_id={}, partition_directive={})",
575            self.region_id, self.partition_directive
576        )
577    }
578}
579
580#[derive(Debug, Clone, PartialEq, Eq)]
581pub enum StagingPartitionDirective {
582    UpdatePartitionExpr(String),
583    RejectAllWrites,
584}
585
586impl StagingPartitionDirective {
587    /// Returns the partition expression carried by this directive, if any.
588    pub fn as_partition_expr(&self) -> Option<&str> {
589        match self {
590            Self::UpdatePartitionExpr(expr) => Some(expr),
591            Self::RejectAllWrites => None,
592        }
593    }
594}
595
596impl Display for StagingPartitionDirective {
597    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
598        match self {
599            Self::UpdatePartitionExpr(expr) => write!(f, "UpdatePartitionExpr({})", expr),
600            Self::RejectAllWrites => write!(f, "RejectAllWrites"),
601        }
602    }
603}
604
605fn serialize_enter_staging_partition_directive<S>(
606    rule: &StagingPartitionDirective,
607    serializer: S,
608) -> std::result::Result<S::Ok, S::Error>
609where
610    S: Serializer,
611{
612    match rule {
613        StagingPartitionDirective::UpdatePartitionExpr(expr) => serializer.serialize_str(expr),
614        StagingPartitionDirective::RejectAllWrites => {
615            #[derive(Serialize)]
616            struct RejectAllWritesSer<'a> {
617                r#type: &'a str,
618            }
619
620            RejectAllWritesSer {
621                r#type: "reject_all_writes",
622            }
623            .serialize(serializer)
624        }
625    }
626}
627
628fn deserialize_enter_staging_partition_directive<'de, D>(
629    deserializer: D,
630) -> std::result::Result<StagingPartitionDirective, D::Error>
631where
632    D: Deserializer<'de>,
633{
634    #[derive(Deserialize)]
635    #[serde(untagged)]
636    enum Compat {
637        Legacy(String),
638        TypeTagged { r#type: String },
639    }
640
641    match Compat::deserialize(deserializer)? {
642        Compat::Legacy(expr) => Ok(StagingPartitionDirective::UpdatePartitionExpr(expr)),
643        Compat::TypeTagged { r#type } if r#type == "reject_all_writes" => {
644            Ok(StagingPartitionDirective::RejectAllWrites)
645        }
646        Compat::TypeTagged { r#type } => Err(serde::de::Error::custom(format!(
647            "Unknown enter staging partition directive type: {}",
648            r#type
649        ))),
650    }
651}
652
653/// Instruction payload for syncing a region from a manifest or another region.
654#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
655pub struct SyncRegion {
656    /// Region id to sync.
657    pub region_id: RegionId,
658    /// Request to sync the region.
659    pub request: SyncRegionFromRequest,
660}
661
662impl Display for SyncRegion {
663    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
664        write!(
665            f,
666            "SyncRegion(region_id={}, request={:?})",
667            self.region_id, self.request
668        )
669    }
670}
671
672#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
673pub struct RemapManifest {
674    pub region_id: RegionId,
675    /// Regions to remap manifests from.
676    pub input_regions: Vec<RegionId>,
677    /// For each old region, which new regions should receive its files
678    pub region_mapping: HashMap<RegionId, Vec<RegionId>>,
679    /// New partition expressions for the new regions.
680    pub new_partition_exprs: HashMap<RegionId, String>,
681}
682
683impl Display for RemapManifest {
684    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
685        write!(
686            f,
687            "RemapManifest(region_id={}, input_regions={:?}, region_mapping={:?}, new_partition_exprs={:?})",
688            self.region_id, self.input_regions, self.region_mapping, self.new_partition_exprs
689        )
690    }
691}
692
693#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
694pub struct ApplyStagingManifest {
695    /// The region ID to apply the staging manifest to.
696    pub region_id: RegionId,
697    /// The partition expression of the staging region.
698    pub partition_expr: String,
699    /// The region that stores the staging manifests in its staging blob storage.
700    pub central_region_id: RegionId,
701    /// The relative path to the staging manifest within the central region's staging blob storage.
702    pub manifest_path: String,
703}
704
705impl Display for ApplyStagingManifest {
706    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
707        write!(
708            f,
709            "ApplyStagingManifest(region_id={}, partition_expr={}, central_region_id={}, manifest_path={})",
710            self.region_id, self.partition_expr, self.central_region_id, self.manifest_path
711        )
712    }
713}
714
715#[derive(Debug, Clone, Serialize, Deserialize, Display, PartialEq)]
716pub enum Instruction {
717    /// Opens regions.
718    #[serde(deserialize_with = "single_or_multiple_from", alias = "OpenRegion")]
719    OpenRegions(Vec<OpenRegion>),
720    /// Closes regions.
721    #[serde(deserialize_with = "single_or_multiple_from", alias = "CloseRegion")]
722    CloseRegions(Vec<RegionIdent>),
723    /// Upgrades regions.
724    #[serde(deserialize_with = "single_or_multiple_from", alias = "UpgradeRegion")]
725    UpgradeRegions(Vec<UpgradeRegion>),
726    #[serde(
727        deserialize_with = "single_or_multiple_from",
728        alias = "DowngradeRegion"
729    )]
730    /// Downgrades regions.
731    DowngradeRegions(Vec<DowngradeRegion>),
732    /// Invalidates batch cache.
733    InvalidateCaches(Vec<CacheIdent>),
734    /// Flushes regions.
735    FlushRegions(FlushRegions),
736    /// Gets file references for regions.
737    GetFileRefs(GetFileRefs),
738    /// Triggers garbage collection for a region.
739    GcRegions(GcRegions),
740    /// Temporary suspend serving reads or writes
741    Suspend,
742    /// Makes regions enter staging state.
743    EnterStagingRegions(Vec<EnterStagingRegion>),
744    /// Syncs regions.
745    SyncRegions(Vec<SyncRegion>),
746    /// Remaps manifests for a region.
747    RemapManifest(RemapManifest),
748
749    /// Applies staging manifests for a region.
750    ApplyStagingManifests(Vec<ApplyStagingManifest>),
751}
752
753impl Instruction {
754    /// Converts the instruction into a vector of [OpenRegion].
755    pub fn into_open_regions(self) -> Option<Vec<OpenRegion>> {
756        match self {
757            Self::OpenRegions(open_regions) => Some(open_regions),
758            _ => None,
759        }
760    }
761
762    /// Converts the instruction into a vector of [RegionIdent].
763    pub fn into_close_regions(self) -> Option<Vec<RegionIdent>> {
764        match self {
765            Self::CloseRegions(close_regions) => Some(close_regions),
766            _ => None,
767        }
768    }
769
770    /// Converts the instruction into a [FlushRegions].
771    pub fn into_flush_regions(self) -> Option<FlushRegions> {
772        match self {
773            Self::FlushRegions(flush_regions) => Some(flush_regions),
774            _ => None,
775        }
776    }
777
778    /// Converts the instruction into a [DowngradeRegion].
779    pub fn into_downgrade_regions(self) -> Option<Vec<DowngradeRegion>> {
780        match self {
781            Self::DowngradeRegions(downgrade_region) => Some(downgrade_region),
782            _ => None,
783        }
784    }
785
786    /// Converts the instruction into a [UpgradeRegion].
787    pub fn into_upgrade_regions(self) -> Option<Vec<UpgradeRegion>> {
788        match self {
789            Self::UpgradeRegions(upgrade_region) => Some(upgrade_region),
790            _ => None,
791        }
792    }
793
794    pub fn into_get_file_refs(self) -> Option<GetFileRefs> {
795        match self {
796            Self::GetFileRefs(get_file_refs) => Some(get_file_refs),
797            _ => None,
798        }
799    }
800
801    pub fn into_gc_regions(self) -> Option<GcRegions> {
802        match self {
803            Self::GcRegions(gc_regions) => Some(gc_regions),
804            _ => None,
805        }
806    }
807
808    pub fn into_enter_staging_regions(self) -> Option<Vec<EnterStagingRegion>> {
809        match self {
810            Self::EnterStagingRegions(enter_staging) => Some(enter_staging),
811            _ => None,
812        }
813    }
814
815    pub fn into_sync_regions(self) -> Option<Vec<SyncRegion>> {
816        match self {
817            Self::SyncRegions(sync_regions) => Some(sync_regions),
818            _ => None,
819        }
820    }
821}
822
823/// The reply of [UpgradeRegion].
824#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
825pub struct UpgradeRegionReply {
826    /// The [RegionId].
827    /// For compatibility, it is defaulted to [RegionId::new(0, 0)].
828    #[serde(default)]
829    pub region_id: RegionId,
830    /// Returns true if `last_entry_id` has been replayed to the latest.
831    pub ready: bool,
832    /// Indicates whether the region exists.
833    pub exists: bool,
834    /// Returns error if any.
835    pub error: Option<String>,
836}
837
838impl Display for UpgradeRegionReply {
839    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
840        write!(
841            f,
842            "(ready={}, exists={}, error={:?})",
843            self.ready, self.exists, self.error
844        )
845    }
846}
847
848#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
849pub struct DowngradeRegionsReply {
850    pub replies: Vec<DowngradeRegionReply>,
851}
852
853impl DowngradeRegionsReply {
854    pub fn new(replies: Vec<DowngradeRegionReply>) -> Self {
855        Self { replies }
856    }
857
858    pub fn single(reply: DowngradeRegionReply) -> Self {
859        Self::new(vec![reply])
860    }
861}
862
863#[derive(Deserialize)]
864#[serde(untagged)]
865enum DowngradeRegionsCompat {
866    Single(DowngradeRegionReply),
867    Multiple(DowngradeRegionsReply),
868}
869
870fn downgrade_regions_compat_from<'de, D>(deserializer: D) -> Result<DowngradeRegionsReply, D::Error>
871where
872    D: Deserializer<'de>,
873{
874    let helper = DowngradeRegionsCompat::deserialize(deserializer)?;
875    Ok(match helper {
876        DowngradeRegionsCompat::Single(x) => DowngradeRegionsReply::new(vec![x]),
877        DowngradeRegionsCompat::Multiple(reply) => reply,
878    })
879}
880
881#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
882pub struct UpgradeRegionsReply {
883    pub replies: Vec<UpgradeRegionReply>,
884}
885
886impl UpgradeRegionsReply {
887    pub fn new(replies: Vec<UpgradeRegionReply>) -> Self {
888        Self { replies }
889    }
890
891    pub fn single(reply: UpgradeRegionReply) -> Self {
892        Self::new(vec![reply])
893    }
894}
895
896#[derive(Deserialize)]
897#[serde(untagged)]
898enum UpgradeRegionsCompat {
899    Single(UpgradeRegionReply),
900    Multiple(UpgradeRegionsReply),
901}
902
903fn upgrade_regions_compat_from<'de, D>(deserializer: D) -> Result<UpgradeRegionsReply, D::Error>
904where
905    D: Deserializer<'de>,
906{
907    let helper = UpgradeRegionsCompat::deserialize(deserializer)?;
908    Ok(match helper {
909        UpgradeRegionsCompat::Single(x) => UpgradeRegionsReply::new(vec![x]),
910        UpgradeRegionsCompat::Multiple(reply) => reply,
911    })
912}
913
914#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
915pub struct EnterStagingRegionReply {
916    pub region_id: RegionId,
917    /// Returns true if the region has entered staging with the target directive.
918    pub ready: bool,
919    /// Indicates whether the region exists.
920    pub exists: bool,
921    /// Return error if any during the operation.
922    pub error: Option<String>,
923}
924
925#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
926pub struct EnterStagingRegionsReply {
927    pub replies: Vec<EnterStagingRegionReply>,
928}
929
930impl EnterStagingRegionsReply {
931    pub fn new(replies: Vec<EnterStagingRegionReply>) -> Self {
932        Self { replies }
933    }
934}
935
936/// Reply for a single region sync request.
937#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
938pub struct SyncRegionReply {
939    /// Region id of the synced region.
940    pub region_id: RegionId,
941    /// Returns true if the region is successfully synced and ready.
942    pub ready: bool,
943    /// Indicates whether the region exists.
944    pub exists: bool,
945    /// Return error message if any during the operation.
946    pub error: Option<String>,
947}
948
949/// Reply for a batch of region sync requests.
950#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
951pub struct SyncRegionsReply {
952    pub replies: Vec<SyncRegionReply>,
953}
954
955impl SyncRegionsReply {
956    pub fn new(replies: Vec<SyncRegionReply>) -> Self {
957        Self { replies }
958    }
959}
960
961#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
962pub struct RemapManifestReply {
963    /// Returns false if the region does not exist.
964    pub exists: bool,
965    /// A map from region IDs to their corresponding remapped manifest paths.
966    pub manifest_paths: HashMap<RegionId, String>,
967    /// Return error if any during the operation.
968    pub error: Option<String>,
969}
970
971impl Display for RemapManifestReply {
972    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
973        write!(
974            f,
975            "RemapManifestReply(manifest_paths={:?}, error={:?})",
976            self.manifest_paths, self.error
977        )
978    }
979}
980
981#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
982pub struct ApplyStagingManifestsReply {
983    pub replies: Vec<ApplyStagingManifestReply>,
984}
985
986impl ApplyStagingManifestsReply {
987    pub fn new(replies: Vec<ApplyStagingManifestReply>) -> Self {
988        Self { replies }
989    }
990}
991
992#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
993pub struct ApplyStagingManifestReply {
994    pub region_id: RegionId,
995    /// Returns true if the region is ready to serve reads and writes.
996    pub ready: bool,
997    /// Indicates whether the region exists.
998    pub exists: bool,
999    /// Return error if any during the operation.
1000    pub error: Option<String>,
1001}
1002
1003#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
1004#[serde(tag = "type", rename_all = "snake_case")]
1005pub enum InstructionReply {
1006    #[serde(alias = "open_region")]
1007    OpenRegions(SimpleReply),
1008    #[serde(alias = "close_region")]
1009    CloseRegions(SimpleReply),
1010    #[serde(
1011        deserialize_with = "upgrade_regions_compat_from",
1012        alias = "upgrade_region"
1013    )]
1014    UpgradeRegions(UpgradeRegionsReply),
1015    #[serde(
1016        alias = "downgrade_region",
1017        deserialize_with = "downgrade_regions_compat_from"
1018    )]
1019    DowngradeRegions(DowngradeRegionsReply),
1020    FlushRegions(FlushRegionReply),
1021    GetFileRefs(GetFileRefsReply),
1022    GcRegions(GcRegionsReply),
1023    EnterStagingRegions(EnterStagingRegionsReply),
1024    SyncRegions(SyncRegionsReply),
1025    RemapManifest(RemapManifestReply),
1026    ApplyStagingManifests(ApplyStagingManifestsReply),
1027}
1028
1029impl Display for InstructionReply {
1030    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1031        match self {
1032            Self::OpenRegions(reply) => write!(f, "InstructionReply::OpenRegions({})", reply),
1033            Self::CloseRegions(reply) => write!(f, "InstructionReply::CloseRegions({})", reply),
1034            Self::UpgradeRegions(reply) => {
1035                write!(f, "InstructionReply::UpgradeRegions({:?})", reply.replies)
1036            }
1037            Self::DowngradeRegions(reply) => {
1038                write!(f, "InstructionReply::DowngradeRegions({:?})", reply.replies)
1039            }
1040            Self::FlushRegions(reply) => write!(f, "InstructionReply::FlushRegions({})", reply),
1041            Self::GetFileRefs(reply) => write!(f, "InstructionReply::GetFileRefs({})", reply),
1042            Self::GcRegions(reply) => write!(f, "InstructionReply::GcRegion({})", reply),
1043            Self::EnterStagingRegions(reply) => {
1044                write!(
1045                    f,
1046                    "InstructionReply::EnterStagingRegions({:?})",
1047                    reply.replies
1048                )
1049            }
1050            Self::SyncRegions(reply) => {
1051                write!(f, "InstructionReply::SyncRegions({:?})", reply.replies)
1052            }
1053            Self::RemapManifest(reply) => write!(f, "InstructionReply::RemapManifest({})", reply),
1054            Self::ApplyStagingManifests(reply) => write!(
1055                f,
1056                "InstructionReply::ApplyStagingManifests({:?})",
1057                reply.replies
1058            ),
1059        }
1060    }
1061}
1062
1063#[cfg(any(test, feature = "testing"))]
1064impl InstructionReply {
1065    pub fn expect_close_regions_reply(self) -> SimpleReply {
1066        match self {
1067            Self::CloseRegions(reply) => reply,
1068            _ => panic!("Expected CloseRegions reply"),
1069        }
1070    }
1071
1072    pub fn expect_open_regions_reply(self) -> SimpleReply {
1073        match self {
1074            Self::OpenRegions(reply) => reply,
1075            _ => panic!("Expected OpenRegions reply"),
1076        }
1077    }
1078
1079    pub fn expect_upgrade_regions_reply(self) -> Vec<UpgradeRegionReply> {
1080        match self {
1081            Self::UpgradeRegions(reply) => reply.replies,
1082            _ => panic!("Expected UpgradeRegion reply"),
1083        }
1084    }
1085
1086    pub fn expect_downgrade_regions_reply(self) -> Vec<DowngradeRegionReply> {
1087        match self {
1088            Self::DowngradeRegions(reply) => reply.replies,
1089            _ => panic!("Expected DowngradeRegion reply"),
1090        }
1091    }
1092
1093    pub fn expect_flush_regions_reply(self) -> FlushRegionReply {
1094        match self {
1095            Self::FlushRegions(reply) => reply,
1096            _ => panic!("Expected FlushRegions reply"),
1097        }
1098    }
1099
1100    pub fn expect_enter_staging_regions_reply(self) -> Vec<EnterStagingRegionReply> {
1101        match self {
1102            Self::EnterStagingRegions(reply) => reply.replies,
1103            _ => panic!("Expected EnterStagingRegion reply"),
1104        }
1105    }
1106
1107    pub fn expect_sync_regions_reply(self) -> Vec<SyncRegionReply> {
1108        match self {
1109            Self::SyncRegions(reply) => reply.replies,
1110            _ => panic!("Expected SyncRegion reply"),
1111        }
1112    }
1113
1114    pub fn expect_remap_manifest_reply(self) -> RemapManifestReply {
1115        match self {
1116            Self::RemapManifest(reply) => reply,
1117            _ => panic!("Expected RemapManifest reply"),
1118        }
1119    }
1120
1121    pub fn expect_apply_staging_manifests_reply(self) -> Vec<ApplyStagingManifestReply> {
1122        match self {
1123            Self::ApplyStagingManifests(reply) => reply.replies,
1124            _ => panic!("Expected ApplyStagingManifest reply"),
1125        }
1126    }
1127}
1128
1129#[cfg(test)]
1130mod tests {
1131    use std::collections::HashSet;
1132
1133    use common_wal::options::WalOptions;
1134    use store_api::storage::{FileId, FileRef};
1135
1136    use super::*;
1137
1138    #[test]
1139    fn test_serialize_instruction() {
1140        let open_region = Instruction::OpenRegions(vec![OpenRegion::new(
1141            RegionIdent {
1142                datanode_id: 2,
1143                table_id: 1024,
1144                region_number: 1,
1145                engine: "mito2".to_string(),
1146            },
1147            "test/foo",
1148            HashMap::new(),
1149            HashMap::new(),
1150            false,
1151            None,
1152            RegionRequirements::empty(),
1153        )]);
1154
1155        let serialized = serde_json::to_string(&open_region).unwrap();
1156        assert_eq!(
1157            r#"{"OpenRegions":[{"region_ident":{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"},"region_storage_path":"test/foo","region_options":{},"region_wal_options":{},"skip_wal_replay":false,"requirements":{"object_storage":false}}]}"#,
1158            serialized
1159        );
1160
1161        let close_region = Instruction::CloseRegions(vec![RegionIdent {
1162            datanode_id: 2,
1163            table_id: 1024,
1164            region_number: 1,
1165            engine: "mito2".to_string(),
1166        }]);
1167
1168        let serialized = serde_json::to_string(&close_region).unwrap();
1169        assert_eq!(
1170            r#"{"CloseRegions":[{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"}]}"#,
1171            serialized
1172        );
1173
1174        let upgrade_region = Instruction::UpgradeRegions(vec![UpgradeRegion {
1175            region_id: RegionId::new(1024, 1),
1176            last_entry_id: None,
1177            metadata_last_entry_id: None,
1178            replay_timeout: Duration::from_millis(1000),
1179            location_id: None,
1180            replay_entry_id: None,
1181            metadata_replay_entry_id: None,
1182        }]);
1183
1184        let serialized = serde_json::to_string(&upgrade_region).unwrap();
1185        assert_eq!(
1186            r#"{"UpgradeRegions":[{"region_id":4398046511105,"last_entry_id":null,"metadata_last_entry_id":null,"replay_timeout":"1s","location_id":null}]}"#,
1187            serialized
1188        );
1189    }
1190
1191    #[test]
1192    fn test_serialize_instruction_reply() {
1193        let downgrade_region_reply = InstructionReply::DowngradeRegions(
1194            DowngradeRegionsReply::single(DowngradeRegionReply {
1195                region_id: RegionId::new(1024, 1),
1196                last_entry_id: None,
1197                metadata_last_entry_id: None,
1198                exists: true,
1199                error: None,
1200            }),
1201        );
1202
1203        let serialized = serde_json::to_string(&downgrade_region_reply).unwrap();
1204        assert_eq!(
1205            r#"{"type":"downgrade_regions","replies":[{"region_id":4398046511105,"last_entry_id":null,"metadata_last_entry_id":null,"exists":true,"error":null}]}"#,
1206            serialized
1207        );
1208
1209        let upgrade_region_reply =
1210            InstructionReply::UpgradeRegions(UpgradeRegionsReply::single(UpgradeRegionReply {
1211                region_id: RegionId::new(1024, 1),
1212                ready: true,
1213                exists: true,
1214                error: None,
1215            }));
1216        let serialized = serde_json::to_string(&upgrade_region_reply).unwrap();
1217        assert_eq!(
1218            r#"{"type":"upgrade_regions","replies":[{"region_id":4398046511105,"ready":true,"exists":true,"error":null}]}"#,
1219            serialized
1220        );
1221    }
1222
1223    #[test]
1224    fn test_deserialize_instruction() {
1225        // legacy open region instruction
1226        let open_region_instruction = r#"{"OpenRegion":{"region_ident":{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"},"region_storage_path":"test/foo","region_options":{},"region_wal_options":{},"skip_wal_replay":false}}"#;
1227        let open_region_instruction: Instruction =
1228            serde_json::from_str(open_region_instruction).unwrap();
1229        let open_region = Instruction::OpenRegions(vec![OpenRegion::new(
1230            RegionIdent {
1231                datanode_id: 2,
1232                table_id: 1024,
1233                region_number: 1,
1234                engine: "mito2".to_string(),
1235            },
1236            "test/foo",
1237            HashMap::new(),
1238            HashMap::new(),
1239            false,
1240            None,
1241            RegionRequirements::empty(),
1242        )]);
1243        assert_eq!(open_region_instruction, open_region);
1244
1245        // legacy close region instruction
1246        let close_region_instruction = r#"{"CloseRegion":{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"}}"#;
1247        let close_region_instruction: Instruction =
1248            serde_json::from_str(close_region_instruction).unwrap();
1249        let close_region = Instruction::CloseRegions(vec![RegionIdent {
1250            datanode_id: 2,
1251            table_id: 1024,
1252            region_number: 1,
1253            engine: "mito2".to_string(),
1254        }]);
1255        assert_eq!(close_region_instruction, close_region);
1256
1257        // legacy downgrade region instruction
1258        let downgrade_region_instruction = r#"{"DowngradeRegions":{"region_id":4398046511105,"flush_timeout":{"secs":1,"nanos":0}}}"#;
1259        let downgrade_region_instruction: Instruction =
1260            serde_json::from_str(downgrade_region_instruction).unwrap();
1261        let downgrade_region = Instruction::DowngradeRegions(vec![DowngradeRegion {
1262            region_id: RegionId::new(1024, 1),
1263            flush_timeout: Some(Duration::from_millis(1000)),
1264        }]);
1265        assert_eq!(downgrade_region_instruction, downgrade_region);
1266
1267        // legacy upgrade region instruction
1268        let upgrade_region_instruction = r#"{"UpgradeRegion":{"region_id":4398046511105,"last_entry_id":null,"metadata_last_entry_id":null,"replay_timeout":"1s","location_id":null,"replay_entry_id":null,"metadata_replay_entry_id":null}}"#;
1269        let upgrade_region_instruction: Instruction =
1270            serde_json::from_str(upgrade_region_instruction).unwrap();
1271        let upgrade_region = Instruction::UpgradeRegions(vec![UpgradeRegion {
1272            region_id: RegionId::new(1024, 1),
1273            last_entry_id: None,
1274            metadata_last_entry_id: None,
1275            replay_timeout: Duration::from_millis(1000),
1276            location_id: None,
1277            replay_entry_id: None,
1278            metadata_replay_entry_id: None,
1279        }]);
1280        assert_eq!(upgrade_region_instruction, upgrade_region);
1281    }
1282
1283    #[test]
1284    fn test_deserialize_instruction_reply() {
1285        // legacy close region reply
1286        let close_region_instruction_reply =
1287            r#"{"result":true,"error":null,"type":"close_region"}"#;
1288        let close_region_instruction_reply: InstructionReply =
1289            serde_json::from_str(close_region_instruction_reply).unwrap();
1290        let close_region_reply = InstructionReply::CloseRegions(SimpleReply {
1291            result: true,
1292            error: None,
1293        });
1294        assert_eq!(close_region_instruction_reply, close_region_reply);
1295
1296        // legacy open region reply
1297        let open_region_instruction_reply = r#"{"result":true,"error":null,"type":"open_region"}"#;
1298        let open_region_instruction_reply: InstructionReply =
1299            serde_json::from_str(open_region_instruction_reply).unwrap();
1300        let open_region_reply = InstructionReply::OpenRegions(SimpleReply {
1301            result: true,
1302            error: None,
1303        });
1304        assert_eq!(open_region_instruction_reply, open_region_reply);
1305
1306        // legacy downgrade region reply
1307        let downgrade_region_instruction_reply = r#"{"region_id":4398046511105,"last_entry_id":null,"metadata_last_entry_id":null,"exists":true,"error":null,"type":"downgrade_region"}"#;
1308        let downgrade_region_instruction_reply: InstructionReply =
1309            serde_json::from_str(downgrade_region_instruction_reply).unwrap();
1310        let downgrade_region_reply = InstructionReply::DowngradeRegions(
1311            DowngradeRegionsReply::single(DowngradeRegionReply {
1312                region_id: RegionId::new(1024, 1),
1313                last_entry_id: None,
1314                metadata_last_entry_id: None,
1315                exists: true,
1316                error: None,
1317            }),
1318        );
1319        assert_eq!(downgrade_region_instruction_reply, downgrade_region_reply);
1320
1321        // legacy upgrade region reply
1322        let upgrade_region_instruction_reply = r#"{"region_id":4398046511105,"ready":true,"exists":true,"error":null,"type":"upgrade_region"}"#;
1323        let upgrade_region_instruction_reply: InstructionReply =
1324            serde_json::from_str(upgrade_region_instruction_reply).unwrap();
1325        let upgrade_region_reply =
1326            InstructionReply::UpgradeRegions(UpgradeRegionsReply::single(UpgradeRegionReply {
1327                region_id: RegionId::new(1024, 1),
1328                ready: true,
1329                exists: true,
1330                error: None,
1331            }));
1332        assert_eq!(upgrade_region_instruction_reply, upgrade_region_reply);
1333    }
1334
1335    #[test]
1336    fn test_enter_staging_partition_rule_compatibility() {
1337        let legacy = r#"{"region_id":4398046511105,"partition_expr":"{\"Expr\":{\"lhs\":{\"Column\":\"x\"},\"op\":\"GtEq\",\"rhs\":{\"Value\":{\"Int32\":0}}}}"}"#;
1338        let enter: EnterStagingRegion = serde_json::from_str(legacy).unwrap();
1339        assert_eq!(enter.region_id, RegionId::new(1024, 1));
1340        assert_eq!(
1341            enter.partition_directive,
1342            StagingPartitionDirective::UpdatePartitionExpr(
1343                "{\"Expr\":{\"lhs\":{\"Column\":\"x\"},\"op\":\"GtEq\",\"rhs\":{\"Value\":{\"Int32\":0}}}}"
1344                    .to_string()
1345            )
1346        );
1347
1348        let serialized = serde_json::to_string(&enter).unwrap();
1349        assert!(serialized.contains("\"partition_directive\":\""));
1350        assert!(!serialized.contains("partition_expr"));
1351
1352        let reject = r#"{"region_id":4398046511105,"partition_expr":{"type":"reject_all_writes"}}"#;
1353        let enter: EnterStagingRegion = serde_json::from_str(reject).unwrap();
1354        assert_eq!(
1355            enter.partition_directive,
1356            StagingPartitionDirective::RejectAllWrites
1357        );
1358    }
1359
1360    #[derive(Debug, Clone, Serialize, Deserialize)]
1361    struct LegacyOpenRegion {
1362        region_ident: RegionIdent,
1363        region_storage_path: String,
1364        region_options: HashMap<String, String>,
1365    }
1366
1367    #[test]
1368    fn test_compatible_serialize_open_region() {
1369        let region_ident = RegionIdent {
1370            datanode_id: 2,
1371            table_id: 1024,
1372            region_number: 1,
1373            engine: "mito2".to_string(),
1374        };
1375        let region_storage_path = "test/foo".to_string();
1376        let region_options = HashMap::from([
1377            ("a".to_string(), "aa".to_string()),
1378            ("b".to_string(), "bb".to_string()),
1379        ]);
1380
1381        // Serialize a legacy OpenRegion.
1382        let legacy_open_region = LegacyOpenRegion {
1383            region_ident: region_ident.clone(),
1384            region_storage_path: region_storage_path.clone(),
1385            region_options: region_options.clone(),
1386        };
1387        let serialized = serde_json::to_string(&legacy_open_region).unwrap();
1388
1389        // Deserialize to OpenRegion.
1390        let deserialized = serde_json::from_str(&serialized).unwrap();
1391        let expected = OpenRegion {
1392            region_ident,
1393            region_storage_path,
1394            region_options,
1395            region_wal_options: HashMap::new(),
1396            skip_wal_replay: false,
1397            reason: None,
1398            requirements: RegionRequirements::empty(),
1399        };
1400        assert_eq!(expected, deserialized);
1401    }
1402
1403    #[test]
1404    fn test_deserialize_open_region_with_legacy_region_wal_options() {
1405        let open_region = r#"{"region_ident":{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"},"region_storage_path":"test/foo","region_options":{},"region_wal_options":{"1":"{\"wal.provider\":\"raft_engine\"}"},"skip_wal_replay":false}"#;
1406
1407        let open_region: OpenRegion = serde_json::from_str(open_region).unwrap();
1408
1409        assert_eq!(
1410            open_region.region_wal_options,
1411            HashMap::from([(1, WalOptions::RaftEngine)])
1412        );
1413    }
1414
1415    #[test]
1416    fn test_serialize_open_region_with_reason_and_requirements() {
1417        let open_region = OpenRegion::new(
1418            RegionIdent {
1419                datanode_id: 2,
1420                table_id: 1024,
1421                region_number: 1,
1422                engine: "mito2".to_string(),
1423            },
1424            "test/foo",
1425            HashMap::new(),
1426            HashMap::new(),
1427            false,
1428            Some(OpenRegionReason::RegionMigration),
1429            RegionRequirements::object_storage(),
1430        );
1431
1432        let serialized = serde_json::to_string(&open_region).unwrap();
1433        assert!(serialized.contains(r#""reason":"RegionMigration""#));
1434        assert!(serialized.contains(r#""object_storage":true"#));
1435
1436        let deserialized: OpenRegion = serde_json::from_str(&serialized).unwrap();
1437        assert_eq!(Some(OpenRegionReason::RegionMigration), deserialized.reason);
1438        assert_eq!(
1439            RegionRequirements::object_storage(),
1440            deserialized.requirements
1441        );
1442    }
1443
1444    #[test]
1445    fn test_flush_regions_creation() {
1446        let region_id = RegionId::new(1024, 1);
1447
1448        // Single region sync flush
1449        let single_sync = FlushRegions::sync_single(region_id);
1450        assert_eq!(single_sync.region_ids, vec![region_id]);
1451        assert_eq!(single_sync.strategy, FlushStrategy::Sync);
1452        assert!(!single_sync.is_hint());
1453        assert!(single_sync.is_sync());
1454        assert_eq!(single_sync.error_strategy, FlushErrorStrategy::FailFast);
1455        assert_eq!(single_sync.reason, None);
1456        assert!(single_sync.is_single_region());
1457        assert_eq!(single_sync.single_region_id(), Some(region_id));
1458
1459        // Batch async flush (hint)
1460        let region_ids = vec![RegionId::new(1024, 1), RegionId::new(1024, 2)];
1461        let batch_async = FlushRegions::async_batch(region_ids.clone());
1462        assert_eq!(batch_async.region_ids, region_ids);
1463        assert_eq!(batch_async.strategy, FlushStrategy::Async);
1464        assert!(batch_async.is_hint());
1465        assert!(!batch_async.is_sync());
1466        assert_eq!(batch_async.error_strategy, FlushErrorStrategy::TryAll);
1467        assert_eq!(batch_async.reason, None);
1468        assert!(!batch_async.is_single_region());
1469        assert_eq!(batch_async.single_region_id(), None);
1470
1471        // Batch sync flush
1472        let batch_sync = FlushRegions::sync_batch(region_ids.clone(), FlushErrorStrategy::FailFast);
1473        assert_eq!(batch_sync.region_ids, region_ids);
1474        assert_eq!(batch_sync.strategy, FlushStrategy::Sync);
1475        assert!(!batch_sync.is_hint());
1476        assert!(batch_sync.is_sync());
1477        assert_eq!(batch_sync.error_strategy, FlushErrorStrategy::FailFast);
1478        assert_eq!(batch_sync.reason, None);
1479
1480        let with_reason = batch_sync.with_reason(RegionFlushReason::RemoteWalPrune);
1481        assert_eq!(with_reason.reason, Some(RegionFlushReason::RemoteWalPrune));
1482    }
1483
1484    #[test]
1485    fn test_flush_regions_conversion() {
1486        let region_id = RegionId::new(1024, 1);
1487
1488        let from_region_id: FlushRegions = region_id.into();
1489        assert_eq!(from_region_id.region_ids, vec![region_id]);
1490        assert_eq!(from_region_id.strategy, FlushStrategy::Sync);
1491        assert!(!from_region_id.is_hint());
1492        assert!(from_region_id.is_sync());
1493
1494        // Test default construction
1495        let flush_regions = FlushRegions {
1496            region_ids: vec![region_id],
1497            strategy: FlushStrategy::Async,
1498            error_strategy: FlushErrorStrategy::TryAll,
1499            reason: None,
1500        };
1501        assert_eq!(flush_regions.region_ids, vec![region_id]);
1502        assert_eq!(flush_regions.strategy, FlushStrategy::Async);
1503        assert!(flush_regions.is_hint());
1504        assert!(!flush_regions.is_sync());
1505    }
1506
1507    #[test]
1508    fn test_flush_region_reply() {
1509        let region_id = RegionId::new(1024, 1);
1510
1511        // Successful single region reply
1512        let success_reply = FlushRegionReply::success_single(region_id);
1513        assert!(success_reply.overall_success);
1514        assert_eq!(success_reply.results.len(), 1);
1515        assert_eq!(success_reply.results[0].0, region_id);
1516        assert!(success_reply.results[0].1.is_ok());
1517
1518        // Failed single region reply
1519        let error_reply = FlushRegionReply::error_single(region_id, "test error".to_string());
1520        assert!(!error_reply.overall_success);
1521        assert_eq!(error_reply.results.len(), 1);
1522        assert_eq!(error_reply.results[0].0, region_id);
1523        assert!(error_reply.results[0].1.is_err());
1524
1525        // Batch reply
1526        let region_id2 = RegionId::new(1024, 2);
1527        let results = vec![
1528            (region_id, Ok(())),
1529            (region_id2, Err("flush failed".to_string())),
1530        ];
1531        let batch_reply = FlushRegionReply::from_results(results);
1532        assert!(!batch_reply.overall_success);
1533        assert_eq!(batch_reply.results.len(), 2);
1534
1535        // Conversion to SimpleReply
1536        let simple_reply = batch_reply.to_simple_reply();
1537        assert!(!simple_reply.result);
1538        assert!(simple_reply.error.is_some());
1539        assert!(simple_reply.error.unwrap().contains("flush failed"));
1540    }
1541
1542    #[test]
1543    fn test_serialize_flush_regions_instruction() {
1544        let region_id = RegionId::new(1024, 1);
1545        let flush_regions = FlushRegions::sync_single(region_id);
1546        let instruction = Instruction::FlushRegions(flush_regions.clone());
1547
1548        let serialized = serde_json::to_string(&instruction).unwrap();
1549        assert!(!serialized.contains("reason"));
1550        let deserialized: Instruction = serde_json::from_str(&serialized).unwrap();
1551
1552        match deserialized {
1553            Instruction::FlushRegions(fr) => {
1554                assert_eq!(fr.region_ids, vec![region_id]);
1555                assert_eq!(fr.strategy, FlushStrategy::Sync);
1556                assert_eq!(fr.error_strategy, FlushErrorStrategy::FailFast);
1557                assert_eq!(fr.reason, None);
1558            }
1559            _ => panic!("Expected FlushRegions instruction"),
1560        }
1561
1562        let legacy = r#"{"FlushRegions":{"region_ids":[4398046511105],"strategy":"Sync","error_strategy":"FailFast"}}"#;
1563        let deserialized: Instruction = serde_json::from_str(legacy).unwrap();
1564        match deserialized {
1565            Instruction::FlushRegions(fr) => {
1566                assert_eq!(fr.region_ids, vec![region_id]);
1567                assert_eq!(fr.strategy, FlushStrategy::Sync);
1568                assert_eq!(fr.error_strategy, FlushErrorStrategy::FailFast);
1569                assert_eq!(fr.reason, None);
1570            }
1571            _ => panic!("Expected FlushRegions instruction"),
1572        }
1573
1574        let flush_regions = FlushRegions::async_batch(vec![region_id])
1575            .with_reason(RegionFlushReason::RemoteWalPrune);
1576        let instruction = Instruction::FlushRegions(flush_regions);
1577        let serialized = serde_json::to_string(&instruction).unwrap();
1578        assert!(serialized.contains(r#""reason":"RemoteWalPrune""#));
1579        let deserialized: Instruction = serde_json::from_str(&serialized).unwrap();
1580        match deserialized {
1581            Instruction::FlushRegions(fr) => {
1582                assert_eq!(fr.reason, Some(RegionFlushReason::RemoteWalPrune));
1583            }
1584            _ => panic!("Expected FlushRegions instruction"),
1585        }
1586    }
1587
1588    #[test]
1589    fn test_serialize_flush_regions_batch_instruction() {
1590        let region_ids = vec![RegionId::new(1024, 1), RegionId::new(1024, 2)];
1591        let flush_regions =
1592            FlushRegions::sync_batch(region_ids.clone(), FlushErrorStrategy::TryAll);
1593        let instruction = Instruction::FlushRegions(flush_regions);
1594
1595        let serialized = serde_json::to_string(&instruction).unwrap();
1596        let deserialized: Instruction = serde_json::from_str(&serialized).unwrap();
1597
1598        match deserialized {
1599            Instruction::FlushRegions(fr) => {
1600                assert_eq!(fr.region_ids, region_ids);
1601                assert_eq!(fr.strategy, FlushStrategy::Sync);
1602                assert!(!fr.is_hint());
1603                assert!(fr.is_sync());
1604                assert_eq!(fr.error_strategy, FlushErrorStrategy::TryAll);
1605                assert_eq!(fr.reason, None);
1606            }
1607            _ => panic!("Expected FlushRegions instruction"),
1608        }
1609    }
1610
1611    #[test]
1612    fn test_serialize_get_file_refs_instruction_reply() {
1613        let mut manifest = FileRefsManifest::default();
1614        let r0 = RegionId::new(1024, 1);
1615        let r1 = RegionId::new(1024, 2);
1616        manifest.file_refs.insert(
1617            r0,
1618            HashSet::from([FileRef::new(r0, FileId::random(), None)]),
1619        );
1620        manifest.file_refs.insert(
1621            r1,
1622            HashSet::from([FileRef::new(r1, FileId::random(), None)]),
1623        );
1624        manifest.manifest_version.insert(r0, 10);
1625        manifest.manifest_version.insert(r1, 20);
1626
1627        let instruction_reply = InstructionReply::GetFileRefs(GetFileRefsReply {
1628            file_refs_manifest: manifest,
1629            success: true,
1630            error: None,
1631        });
1632
1633        let serialized = serde_json::to_string(&instruction_reply).unwrap();
1634        let deserialized = serde_json::from_str(&serialized).unwrap();
1635
1636        assert_eq!(instruction_reply, deserialized);
1637    }
1638}