common_meta/
instruction.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::fmt::{Display, Formatter};
17use std::time::Duration;
18
19use serde::{Deserialize, Deserializer, Serialize};
20use store_api::storage::{FileRefsManifest, GcReport, RegionId, RegionNumber};
21use strum::Display;
22use table::metadata::TableId;
23use table::table_name::TableName;
24
25use crate::flow_name::FlowName;
26use crate::key::schema_name::SchemaName;
27use crate::key::{FlowId, FlowPartitionId};
28use crate::peer::Peer;
29use crate::{DatanodeId, FlownodeId};
30
31#[derive(Eq, Hash, PartialEq, Clone, Debug, Serialize, Deserialize)]
32pub struct RegionIdent {
33    pub datanode_id: DatanodeId,
34    pub table_id: TableId,
35    pub region_number: RegionNumber,
36    pub engine: String,
37}
38
39impl RegionIdent {
40    pub fn get_region_id(&self) -> RegionId {
41        RegionId::new(self.table_id, self.region_number)
42    }
43}
44
45impl Display for RegionIdent {
46    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
47        write!(
48            f,
49            "RegionIdent(datanode_id='{}', table_id={}, region_number={}, engine = {})",
50            self.datanode_id, self.table_id, self.region_number, self.engine
51        )
52    }
53}
54
55/// The result of downgrade leader region.
56#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
57pub struct DowngradeRegionReply {
58    /// The [RegionId].
59    /// For compatibility, it is defaulted to [RegionId::new(0, 0)].
60    #[serde(default)]
61    pub region_id: RegionId,
62    /// Returns the `last_entry_id` if available.
63    pub last_entry_id: Option<u64>,
64    /// Returns the `metadata_last_entry_id` if available (Only available for metric engine).
65    pub metadata_last_entry_id: Option<u64>,
66    /// Indicates whether the region exists.
67    pub exists: bool,
68    /// Return error if any during the operation.
69    pub error: Option<String>,
70}
71
72impl Display for DowngradeRegionReply {
73    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
74        write!(
75            f,
76            "(last_entry_id={:?}, exists={}, error={:?})",
77            self.last_entry_id, self.exists, self.error
78        )
79    }
80}
81
82#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
83pub struct SimpleReply {
84    pub result: bool,
85    pub error: Option<String>,
86}
87
88/// Reply for flush region operations with support for batch results.
89#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
90pub struct FlushRegionReply {
91    /// Results for each region that was attempted to be flushed.
92    /// For single region flushes, this will contain one result.
93    /// For batch flushes, this contains results for all attempted regions.
94    pub results: Vec<(RegionId, Result<(), String>)>,
95    /// Overall success: true if all regions were flushed successfully.
96    pub overall_success: bool,
97}
98
99impl FlushRegionReply {
100    /// Create a successful single region reply.
101    pub fn success_single(region_id: RegionId) -> Self {
102        Self {
103            results: vec![(region_id, Ok(()))],
104            overall_success: true,
105        }
106    }
107
108    /// Create a failed single region reply.
109    pub fn error_single(region_id: RegionId, error: String) -> Self {
110        Self {
111            results: vec![(region_id, Err(error))],
112            overall_success: false,
113        }
114    }
115
116    /// Create a batch reply from individual results.
117    pub fn from_results(results: Vec<(RegionId, Result<(), String>)>) -> Self {
118        let overall_success = results.iter().all(|(_, result)| result.is_ok());
119        Self {
120            results,
121            overall_success,
122        }
123    }
124
125    /// Convert to SimpleReply for backward compatibility.
126    pub fn to_simple_reply(&self) -> SimpleReply {
127        if self.overall_success {
128            SimpleReply {
129                result: true,
130                error: None,
131            }
132        } else {
133            let errors: Vec<String> = self
134                .results
135                .iter()
136                .filter_map(|(region_id, result)| {
137                    result
138                        .as_ref()
139                        .err()
140                        .map(|err| format!("{}: {}", region_id, err))
141                })
142                .collect();
143            SimpleReply {
144                result: false,
145                error: Some(errors.join("; ")),
146            }
147        }
148    }
149}
150
151impl Display for SimpleReply {
152    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
153        write!(f, "(result={}, error={:?})", self.result, self.error)
154    }
155}
156
157impl Display for FlushRegionReply {
158    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
159        let results_str = self
160            .results
161            .iter()
162            .map(|(region_id, result)| match result {
163                Ok(()) => format!("{}:OK", region_id),
164                Err(err) => format!("{}:ERR({})", region_id, err),
165            })
166            .collect::<Vec<_>>()
167            .join(", ");
168        write!(
169            f,
170            "(overall_success={}, results=[{}])",
171            self.overall_success, results_str
172        )
173    }
174}
175
176impl Display for OpenRegion {
177    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
178        write!(
179            f,
180            "OpenRegion(region_ident={}, region_storage_path={})",
181            self.region_ident, self.region_storage_path
182        )
183    }
184}
185
186#[serde_with::serde_as]
187#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
188pub struct OpenRegion {
189    pub region_ident: RegionIdent,
190    pub region_storage_path: String,
191    pub region_options: HashMap<String, String>,
192    #[serde(default)]
193    #[serde_as(as = "HashMap<serde_with::DisplayFromStr, _>")]
194    pub region_wal_options: HashMap<RegionNumber, String>,
195    #[serde(default)]
196    pub skip_wal_replay: bool,
197}
198
199impl OpenRegion {
200    pub fn new(
201        region_ident: RegionIdent,
202        path: &str,
203        region_options: HashMap<String, String>,
204        region_wal_options: HashMap<RegionNumber, String>,
205        skip_wal_replay: bool,
206    ) -> Self {
207        Self {
208            region_ident,
209            region_storage_path: path.to_string(),
210            region_options,
211            region_wal_options,
212            skip_wal_replay,
213        }
214    }
215}
216
217/// The instruction of downgrading leader region.
218#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
219pub struct DowngradeRegion {
220    /// The [RegionId].
221    pub region_id: RegionId,
222    /// The timeout of waiting for flush the region.
223    ///
224    /// `None` stands for don't flush before downgrading the region.
225    #[serde(default)]
226    pub flush_timeout: Option<Duration>,
227}
228
229impl Display for DowngradeRegion {
230    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
231        write!(
232            f,
233            "DowngradeRegion(region_id={}, flush_timeout={:?})",
234            self.region_id, self.flush_timeout,
235        )
236    }
237}
238
239/// Upgrades a follower region to leader region.
240#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
241pub struct UpgradeRegion {
242    /// The [RegionId].
243    pub region_id: RegionId,
244    /// The `last_entry_id` of old leader region.
245    pub last_entry_id: Option<u64>,
246    /// The `last_entry_id` of old leader metadata region (Only used for metric engine).
247    pub metadata_last_entry_id: Option<u64>,
248    /// The timeout of waiting for a wal replay.
249    ///
250    /// `None` stands for no wait,
251    /// it's helpful to verify whether the leader region is ready.
252    #[serde(with = "humantime_serde")]
253    pub replay_timeout: Duration,
254    /// The hint for replaying memtable.
255    #[serde(default)]
256    pub location_id: Option<u64>,
257    #[serde(default, skip_serializing_if = "Option::is_none")]
258    pub replay_entry_id: Option<u64>,
259    #[serde(default, skip_serializing_if = "Option::is_none")]
260    pub metadata_replay_entry_id: Option<u64>,
261}
262
263impl UpgradeRegion {
264    /// Sets the replay entry id.
265    pub fn with_replay_entry_id(mut self, replay_entry_id: Option<u64>) -> Self {
266        self.replay_entry_id = replay_entry_id;
267        self
268    }
269
270    /// Sets the metadata replay entry id.
271    pub fn with_metadata_replay_entry_id(mut self, metadata_replay_entry_id: Option<u64>) -> Self {
272        self.metadata_replay_entry_id = metadata_replay_entry_id;
273        self
274    }
275}
276
277#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
278/// The identifier of cache.
279pub enum CacheIdent {
280    FlowId(FlowId),
281    /// Indicate change of address of flownode.
282    FlowNodeAddressChange(u64),
283    FlowName(FlowName),
284    TableId(TableId),
285    TableName(TableName),
286    SchemaName(SchemaName),
287    CreateFlow(CreateFlow),
288    DropFlow(DropFlow),
289}
290
291#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
292pub struct CreateFlow {
293    /// The unique identifier for the flow.
294    pub flow_id: FlowId,
295    pub source_table_ids: Vec<TableId>,
296    /// Mapping of flow partition to peer information
297    pub partition_to_peer_mapping: Vec<(FlowPartitionId, Peer)>,
298}
299
300#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
301pub struct DropFlow {
302    pub flow_id: FlowId,
303    pub source_table_ids: Vec<TableId>,
304    /// Mapping of flow partition to flownode id
305    pub flow_part2node_id: Vec<(FlowPartitionId, FlownodeId)>,
306}
307
308/// Strategy for executing flush operations.
309#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
310pub enum FlushStrategy {
311    /// Synchronous operation that waits for completion and expects a reply
312    #[default]
313    Sync,
314    /// Asynchronous hint operation (fire-and-forget, no reply expected)
315    Async,
316}
317
318/// Error handling strategy for batch flush operations.
319#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
320pub enum FlushErrorStrategy {
321    /// Abort on first error (fail-fast)
322    #[default]
323    FailFast,
324    /// Attempt to flush all regions and collect all errors
325    TryAll,
326}
327
328/// Unified flush instruction supporting both single and batch operations
329/// with configurable execution strategies and error handling.
330#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
331pub struct FlushRegions {
332    /// List of region IDs to flush. Can contain a single region or multiple regions.
333    pub region_ids: Vec<RegionId>,
334    /// Execution strategy: Sync (expects reply) or Async (fire-and-forget hint).
335    #[serde(default)]
336    pub strategy: FlushStrategy,
337    /// Error handling strategy for batch operations (only applies when multiple regions and sync strategy).
338    #[serde(default)]
339    pub error_strategy: FlushErrorStrategy,
340}
341
342impl FlushRegions {
343    /// Create synchronous single-region flush
344    pub fn sync_single(region_id: RegionId) -> Self {
345        Self {
346            region_ids: vec![region_id],
347            strategy: FlushStrategy::Sync,
348            error_strategy: FlushErrorStrategy::FailFast,
349        }
350    }
351
352    /// Create asynchronous batch flush (fire-and-forget)
353    pub fn async_batch(region_ids: Vec<RegionId>) -> Self {
354        Self {
355            region_ids,
356            strategy: FlushStrategy::Async,
357            error_strategy: FlushErrorStrategy::TryAll,
358        }
359    }
360
361    /// Create synchronous batch flush with error strategy
362    pub fn sync_batch(region_ids: Vec<RegionId>, error_strategy: FlushErrorStrategy) -> Self {
363        Self {
364            region_ids,
365            strategy: FlushStrategy::Sync,
366            error_strategy,
367        }
368    }
369
370    /// Check if this is a single region flush.
371    pub fn is_single_region(&self) -> bool {
372        self.region_ids.len() == 1
373    }
374
375    /// Get the single region ID if this is a single region flush.
376    pub fn single_region_id(&self) -> Option<RegionId> {
377        if self.is_single_region() {
378            self.region_ids.first().copied()
379        } else {
380            None
381        }
382    }
383
384    /// Check if this is a hint (asynchronous) operation.
385    pub fn is_hint(&self) -> bool {
386        matches!(self.strategy, FlushStrategy::Async)
387    }
388
389    /// Check if this is a synchronous operation.
390    pub fn is_sync(&self) -> bool {
391        matches!(self.strategy, FlushStrategy::Sync)
392    }
393}
394
395impl From<RegionId> for FlushRegions {
396    fn from(region_id: RegionId) -> Self {
397        Self::sync_single(region_id)
398    }
399}
400
401#[derive(Debug, Deserialize)]
402#[serde(untagged)]
403enum SingleOrMultiple<T> {
404    Single(T),
405    Multiple(Vec<T>),
406}
407
408fn single_or_multiple_from<'de, D, T>(deserializer: D) -> Result<Vec<T>, D::Error>
409where
410    D: Deserializer<'de>,
411    T: Deserialize<'de>,
412{
413    let helper = SingleOrMultiple::<T>::deserialize(deserializer)?;
414    Ok(match helper {
415        SingleOrMultiple::Single(x) => vec![x],
416        SingleOrMultiple::Multiple(xs) => xs,
417    })
418}
419
420/// Instruction to get file references for specified regions.
421#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
422pub struct GetFileRefs {
423    /// List of region IDs to get file references from active FileHandles (in-memory).
424    pub query_regions: Vec<RegionId>,
425    /// Mapping from the source region ID (where to read the manifest) to
426    /// the target region IDs (whose file references to look for).
427    /// Key: The region ID of the manifest.
428    /// Value: The list of region IDs to find references for in that manifest.
429    pub related_regions: HashMap<RegionId, Vec<RegionId>>,
430}
431
432impl Display for GetFileRefs {
433    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
434        write!(f, "GetFileRefs(region_ids={:?})", self.query_regions)
435    }
436}
437
438/// Instruction to trigger garbage collection for a region.
439#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
440pub struct GcRegions {
441    /// The region ID to perform GC on, only regions that are currently on the given datanode can be garbage collected, regions not on the datanode will report errors.
442    pub regions: Vec<RegionId>,
443    /// The file references manifest containing temporary file references.
444    pub file_refs_manifest: FileRefsManifest,
445    /// Whether to perform a full file listing to find orphan files.
446    pub full_file_listing: bool,
447}
448
449impl Display for GcRegions {
450    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
451        write!(
452            f,
453            "GcRegion(regions={:?}, file_refs_count={}, full_file_listing={})",
454            self.regions,
455            self.file_refs_manifest.file_refs.len(),
456            self.full_file_listing
457        )
458    }
459}
460
461/// Reply for GetFileRefs instruction.
462#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
463pub struct GetFileRefsReply {
464    /// The file references manifest.
465    pub file_refs_manifest: FileRefsManifest,
466    /// Whether the operation was successful.
467    pub success: bool,
468    /// Error message if any.
469    pub error: Option<String>,
470}
471
472impl Display for GetFileRefsReply {
473    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
474        write!(
475            f,
476            "GetFileRefsReply(success={}, file_refs_count={}, error={:?})",
477            self.success,
478            self.file_refs_manifest.file_refs.len(),
479            self.error
480        )
481    }
482}
483
484/// Reply for GC instruction.
485#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
486pub struct GcRegionsReply {
487    pub result: Result<GcReport, String>,
488}
489
490impl Display for GcRegionsReply {
491    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
492        write!(
493            f,
494            "GcReply(result={})",
495            match &self.result {
496                Ok(report) => format!(
497                    "GcReport(deleted_files_count={}, need_retry_regions_count={})",
498                    report.deleted_files.len(),
499                    report.need_retry_regions.len()
500                ),
501                Err(err) => format!("Err({})", err),
502            }
503        )
504    }
505}
506
507#[derive(Debug, Clone, Serialize, Deserialize, Display, PartialEq)]
508pub enum Instruction {
509    /// Opens regions.
510    #[serde(deserialize_with = "single_or_multiple_from", alias = "OpenRegion")]
511    OpenRegions(Vec<OpenRegion>),
512    /// Closes regions.
513    #[serde(deserialize_with = "single_or_multiple_from", alias = "CloseRegion")]
514    CloseRegions(Vec<RegionIdent>),
515    /// Upgrades regions.
516    #[serde(deserialize_with = "single_or_multiple_from", alias = "UpgradeRegion")]
517    UpgradeRegions(Vec<UpgradeRegion>),
518    #[serde(
519        deserialize_with = "single_or_multiple_from",
520        alias = "DowngradeRegion"
521    )]
522    /// Downgrades regions.
523    DowngradeRegions(Vec<DowngradeRegion>),
524    /// Invalidates batch cache.
525    InvalidateCaches(Vec<CacheIdent>),
526    /// Flushes regions.
527    FlushRegions(FlushRegions),
528    /// Gets file references for regions.
529    GetFileRefs(GetFileRefs),
530    /// Triggers garbage collection for a region.
531    GcRegions(GcRegions),
532}
533
534impl Instruction {
535    /// Converts the instruction into a vector of [OpenRegion].
536    pub fn into_open_regions(self) -> Option<Vec<OpenRegion>> {
537        match self {
538            Self::OpenRegions(open_regions) => Some(open_regions),
539            _ => None,
540        }
541    }
542
543    /// Converts the instruction into a vector of [RegionIdent].
544    pub fn into_close_regions(self) -> Option<Vec<RegionIdent>> {
545        match self {
546            Self::CloseRegions(close_regions) => Some(close_regions),
547            _ => None,
548        }
549    }
550
551    /// Converts the instruction into a [FlushRegions].
552    pub fn into_flush_regions(self) -> Option<FlushRegions> {
553        match self {
554            Self::FlushRegions(flush_regions) => Some(flush_regions),
555            _ => None,
556        }
557    }
558
559    /// Converts the instruction into a [DowngradeRegion].
560    pub fn into_downgrade_regions(self) -> Option<Vec<DowngradeRegion>> {
561        match self {
562            Self::DowngradeRegions(downgrade_region) => Some(downgrade_region),
563            _ => None,
564        }
565    }
566
567    /// Converts the instruction into a [UpgradeRegion].
568    pub fn into_upgrade_regions(self) -> Option<Vec<UpgradeRegion>> {
569        match self {
570            Self::UpgradeRegions(upgrade_region) => Some(upgrade_region),
571            _ => None,
572        }
573    }
574
575    pub fn into_get_file_refs(self) -> Option<GetFileRefs> {
576        match self {
577            Self::GetFileRefs(get_file_refs) => Some(get_file_refs),
578            _ => None,
579        }
580    }
581
582    pub fn into_gc_regions(self) -> Option<GcRegions> {
583        match self {
584            Self::GcRegions(gc_regions) => Some(gc_regions),
585            _ => None,
586        }
587    }
588}
589
590/// The reply of [UpgradeRegion].
591#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
592pub struct UpgradeRegionReply {
593    /// The [RegionId].
594    /// For compatibility, it is defaulted to [RegionId::new(0, 0)].
595    #[serde(default)]
596    pub region_id: RegionId,
597    /// Returns true if `last_entry_id` has been replayed to the latest.
598    pub ready: bool,
599    /// Indicates whether the region exists.
600    pub exists: bool,
601    /// Returns error if any.
602    pub error: Option<String>,
603}
604
605impl Display for UpgradeRegionReply {
606    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
607        write!(
608            f,
609            "(ready={}, exists={}, error={:?})",
610            self.ready, self.exists, self.error
611        )
612    }
613}
614
615#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
616pub struct DowngradeRegionsReply {
617    pub replies: Vec<DowngradeRegionReply>,
618}
619
620impl DowngradeRegionsReply {
621    pub fn new(replies: Vec<DowngradeRegionReply>) -> Self {
622        Self { replies }
623    }
624
625    pub fn single(reply: DowngradeRegionReply) -> Self {
626        Self::new(vec![reply])
627    }
628}
629
630#[derive(Deserialize)]
631#[serde(untagged)]
632enum DowngradeRegionsCompat {
633    Single(DowngradeRegionReply),
634    Multiple(DowngradeRegionsReply),
635}
636
637fn downgrade_regions_compat_from<'de, D>(deserializer: D) -> Result<DowngradeRegionsReply, D::Error>
638where
639    D: Deserializer<'de>,
640{
641    let helper = DowngradeRegionsCompat::deserialize(deserializer)?;
642    Ok(match helper {
643        DowngradeRegionsCompat::Single(x) => DowngradeRegionsReply::new(vec![x]),
644        DowngradeRegionsCompat::Multiple(reply) => reply,
645    })
646}
647
648#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
649pub struct UpgradeRegionsReply {
650    pub replies: Vec<UpgradeRegionReply>,
651}
652
653impl UpgradeRegionsReply {
654    pub fn new(replies: Vec<UpgradeRegionReply>) -> Self {
655        Self { replies }
656    }
657
658    pub fn single(reply: UpgradeRegionReply) -> Self {
659        Self::new(vec![reply])
660    }
661}
662
663#[derive(Deserialize)]
664#[serde(untagged)]
665enum UpgradeRegionsCompat {
666    Single(UpgradeRegionReply),
667    Multiple(UpgradeRegionsReply),
668}
669
670fn upgrade_regions_compat_from<'de, D>(deserializer: D) -> Result<UpgradeRegionsReply, D::Error>
671where
672    D: Deserializer<'de>,
673{
674    let helper = UpgradeRegionsCompat::deserialize(deserializer)?;
675    Ok(match helper {
676        UpgradeRegionsCompat::Single(x) => UpgradeRegionsReply::new(vec![x]),
677        UpgradeRegionsCompat::Multiple(reply) => reply,
678    })
679}
680
681#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
682#[serde(tag = "type", rename_all = "snake_case")]
683pub enum InstructionReply {
684    #[serde(alias = "open_region")]
685    OpenRegions(SimpleReply),
686    #[serde(alias = "close_region")]
687    CloseRegions(SimpleReply),
688    #[serde(
689        deserialize_with = "upgrade_regions_compat_from",
690        alias = "upgrade_region"
691    )]
692    UpgradeRegions(UpgradeRegionsReply),
693    #[serde(
694        alias = "downgrade_region",
695        deserialize_with = "downgrade_regions_compat_from"
696    )]
697    DowngradeRegions(DowngradeRegionsReply),
698    FlushRegions(FlushRegionReply),
699    GetFileRefs(GetFileRefsReply),
700    GcRegions(GcRegionsReply),
701}
702
703impl Display for InstructionReply {
704    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
705        match self {
706            Self::OpenRegions(reply) => write!(f, "InstructionReply::OpenRegions({})", reply),
707            Self::CloseRegions(reply) => write!(f, "InstructionReply::CloseRegions({})", reply),
708            Self::UpgradeRegions(reply) => {
709                write!(f, "InstructionReply::UpgradeRegions({:?})", reply.replies)
710            }
711            Self::DowngradeRegions(reply) => {
712                write!(f, "InstructionReply::DowngradeRegions({:?})", reply.replies)
713            }
714            Self::FlushRegions(reply) => write!(f, "InstructionReply::FlushRegions({})", reply),
715            Self::GetFileRefs(reply) => write!(f, "InstructionReply::GetFileRefs({})", reply),
716            Self::GcRegions(reply) => write!(f, "InstructionReply::GcRegion({})", reply),
717        }
718    }
719}
720
721#[cfg(any(test, feature = "testing"))]
722impl InstructionReply {
723    pub fn expect_close_regions_reply(self) -> SimpleReply {
724        match self {
725            Self::CloseRegions(reply) => reply,
726            _ => panic!("Expected CloseRegions reply"),
727        }
728    }
729
730    pub fn expect_open_regions_reply(self) -> SimpleReply {
731        match self {
732            Self::OpenRegions(reply) => reply,
733            _ => panic!("Expected OpenRegions reply"),
734        }
735    }
736
737    pub fn expect_upgrade_regions_reply(self) -> Vec<UpgradeRegionReply> {
738        match self {
739            Self::UpgradeRegions(reply) => reply.replies,
740            _ => panic!("Expected UpgradeRegion reply"),
741        }
742    }
743
744    pub fn expect_downgrade_regions_reply(self) -> Vec<DowngradeRegionReply> {
745        match self {
746            Self::DowngradeRegions(reply) => reply.replies,
747            _ => panic!("Expected DowngradeRegion reply"),
748        }
749    }
750
751    pub fn expect_flush_regions_reply(self) -> FlushRegionReply {
752        match self {
753            Self::FlushRegions(reply) => reply,
754            _ => panic!("Expected FlushRegions reply"),
755        }
756    }
757}
758
759#[cfg(test)]
760mod tests {
761    use std::collections::HashSet;
762
763    use store_api::storage::FileId;
764
765    use super::*;
766
767    #[test]
768    fn test_serialize_instruction() {
769        let open_region = Instruction::OpenRegions(vec![OpenRegion::new(
770            RegionIdent {
771                datanode_id: 2,
772                table_id: 1024,
773                region_number: 1,
774                engine: "mito2".to_string(),
775            },
776            "test/foo",
777            HashMap::new(),
778            HashMap::new(),
779            false,
780        )]);
781
782        let serialized = serde_json::to_string(&open_region).unwrap();
783        assert_eq!(
784            r#"{"OpenRegions":[{"region_ident":{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"},"region_storage_path":"test/foo","region_options":{},"region_wal_options":{},"skip_wal_replay":false}]}"#,
785            serialized
786        );
787
788        let close_region = Instruction::CloseRegions(vec![RegionIdent {
789            datanode_id: 2,
790            table_id: 1024,
791            region_number: 1,
792            engine: "mito2".to_string(),
793        }]);
794
795        let serialized = serde_json::to_string(&close_region).unwrap();
796        assert_eq!(
797            r#"{"CloseRegions":[{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"}]}"#,
798            serialized
799        );
800
801        let upgrade_region = Instruction::UpgradeRegions(vec![UpgradeRegion {
802            region_id: RegionId::new(1024, 1),
803            last_entry_id: None,
804            metadata_last_entry_id: None,
805            replay_timeout: Duration::from_millis(1000),
806            location_id: None,
807            replay_entry_id: None,
808            metadata_replay_entry_id: None,
809        }]);
810
811        let serialized = serde_json::to_string(&upgrade_region).unwrap();
812        assert_eq!(
813            r#"{"UpgradeRegions":[{"region_id":4398046511105,"last_entry_id":null,"metadata_last_entry_id":null,"replay_timeout":"1s","location_id":null}]}"#,
814            serialized
815        );
816    }
817
818    #[test]
819    fn test_serialize_instruction_reply() {
820        let downgrade_region_reply = InstructionReply::DowngradeRegions(
821            DowngradeRegionsReply::single(DowngradeRegionReply {
822                region_id: RegionId::new(1024, 1),
823                last_entry_id: None,
824                metadata_last_entry_id: None,
825                exists: true,
826                error: None,
827            }),
828        );
829
830        let serialized = serde_json::to_string(&downgrade_region_reply).unwrap();
831        assert_eq!(
832            r#"{"type":"downgrade_regions","replies":[{"region_id":4398046511105,"last_entry_id":null,"metadata_last_entry_id":null,"exists":true,"error":null}]}"#,
833            serialized
834        );
835
836        let upgrade_region_reply =
837            InstructionReply::UpgradeRegions(UpgradeRegionsReply::single(UpgradeRegionReply {
838                region_id: RegionId::new(1024, 1),
839                ready: true,
840                exists: true,
841                error: None,
842            }));
843        let serialized = serde_json::to_string(&upgrade_region_reply).unwrap();
844        assert_eq!(
845            r#"{"type":"upgrade_regions","replies":[{"region_id":4398046511105,"ready":true,"exists":true,"error":null}]}"#,
846            serialized
847        );
848    }
849
850    #[test]
851    fn test_deserialize_instruction() {
852        // legacy open region instruction
853        let open_region_instruction = r#"{"OpenRegion":{"region_ident":{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"},"region_storage_path":"test/foo","region_options":{},"region_wal_options":{},"skip_wal_replay":false}}"#;
854        let open_region_instruction: Instruction =
855            serde_json::from_str(open_region_instruction).unwrap();
856        let open_region = Instruction::OpenRegions(vec![OpenRegion::new(
857            RegionIdent {
858                datanode_id: 2,
859                table_id: 1024,
860                region_number: 1,
861                engine: "mito2".to_string(),
862            },
863            "test/foo",
864            HashMap::new(),
865            HashMap::new(),
866            false,
867        )]);
868        assert_eq!(open_region_instruction, open_region);
869
870        // legacy close region instruction
871        let close_region_instruction = r#"{"CloseRegion":{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"}}"#;
872        let close_region_instruction: Instruction =
873            serde_json::from_str(close_region_instruction).unwrap();
874        let close_region = Instruction::CloseRegions(vec![RegionIdent {
875            datanode_id: 2,
876            table_id: 1024,
877            region_number: 1,
878            engine: "mito2".to_string(),
879        }]);
880        assert_eq!(close_region_instruction, close_region);
881
882        // legacy downgrade region instruction
883        let downgrade_region_instruction = r#"{"DowngradeRegions":{"region_id":4398046511105,"flush_timeout":{"secs":1,"nanos":0}}}"#;
884        let downgrade_region_instruction: Instruction =
885            serde_json::from_str(downgrade_region_instruction).unwrap();
886        let downgrade_region = Instruction::DowngradeRegions(vec![DowngradeRegion {
887            region_id: RegionId::new(1024, 1),
888            flush_timeout: Some(Duration::from_millis(1000)),
889        }]);
890        assert_eq!(downgrade_region_instruction, downgrade_region);
891
892        // legacy upgrade region instruction
893        let upgrade_region_instruction = r#"{"UpgradeRegion":{"region_id":4398046511105,"last_entry_id":null,"metadata_last_entry_id":null,"replay_timeout":"1s","location_id":null,"replay_entry_id":null,"metadata_replay_entry_id":null}}"#;
894        let upgrade_region_instruction: Instruction =
895            serde_json::from_str(upgrade_region_instruction).unwrap();
896        let upgrade_region = Instruction::UpgradeRegions(vec![UpgradeRegion {
897            region_id: RegionId::new(1024, 1),
898            last_entry_id: None,
899            metadata_last_entry_id: None,
900            replay_timeout: Duration::from_millis(1000),
901            location_id: None,
902            replay_entry_id: None,
903            metadata_replay_entry_id: None,
904        }]);
905        assert_eq!(upgrade_region_instruction, upgrade_region);
906    }
907
908    #[test]
909    fn test_deserialize_instruction_reply() {
910        // legacy close region reply
911        let close_region_instruction_reply =
912            r#"{"result":true,"error":null,"type":"close_region"}"#;
913        let close_region_instruction_reply: InstructionReply =
914            serde_json::from_str(close_region_instruction_reply).unwrap();
915        let close_region_reply = InstructionReply::CloseRegions(SimpleReply {
916            result: true,
917            error: None,
918        });
919        assert_eq!(close_region_instruction_reply, close_region_reply);
920
921        // legacy open region reply
922        let open_region_instruction_reply = r#"{"result":true,"error":null,"type":"open_region"}"#;
923        let open_region_instruction_reply: InstructionReply =
924            serde_json::from_str(open_region_instruction_reply).unwrap();
925        let open_region_reply = InstructionReply::OpenRegions(SimpleReply {
926            result: true,
927            error: None,
928        });
929        assert_eq!(open_region_instruction_reply, open_region_reply);
930
931        // legacy downgrade region reply
932        let downgrade_region_instruction_reply = r#"{"region_id":4398046511105,"last_entry_id":null,"metadata_last_entry_id":null,"exists":true,"error":null,"type":"downgrade_region"}"#;
933        let downgrade_region_instruction_reply: InstructionReply =
934            serde_json::from_str(downgrade_region_instruction_reply).unwrap();
935        let downgrade_region_reply = InstructionReply::DowngradeRegions(
936            DowngradeRegionsReply::single(DowngradeRegionReply {
937                region_id: RegionId::new(1024, 1),
938                last_entry_id: None,
939                metadata_last_entry_id: None,
940                exists: true,
941                error: None,
942            }),
943        );
944        assert_eq!(downgrade_region_instruction_reply, downgrade_region_reply);
945
946        // legacy upgrade region reply
947        let upgrade_region_instruction_reply = r#"{"region_id":4398046511105,"ready":true,"exists":true,"error":null,"type":"upgrade_region"}"#;
948        let upgrade_region_instruction_reply: InstructionReply =
949            serde_json::from_str(upgrade_region_instruction_reply).unwrap();
950        let upgrade_region_reply =
951            InstructionReply::UpgradeRegions(UpgradeRegionsReply::single(UpgradeRegionReply {
952                region_id: RegionId::new(1024, 1),
953                ready: true,
954                exists: true,
955                error: None,
956            }));
957        assert_eq!(upgrade_region_instruction_reply, upgrade_region_reply);
958    }
959
960    #[derive(Debug, Clone, Serialize, Deserialize)]
961    struct LegacyOpenRegion {
962        region_ident: RegionIdent,
963        region_storage_path: String,
964        region_options: HashMap<String, String>,
965    }
966
967    #[test]
968    fn test_compatible_serialize_open_region() {
969        let region_ident = RegionIdent {
970            datanode_id: 2,
971            table_id: 1024,
972            region_number: 1,
973            engine: "mito2".to_string(),
974        };
975        let region_storage_path = "test/foo".to_string();
976        let region_options = HashMap::from([
977            ("a".to_string(), "aa".to_string()),
978            ("b".to_string(), "bb".to_string()),
979        ]);
980
981        // Serialize a legacy OpenRegion.
982        let legacy_open_region = LegacyOpenRegion {
983            region_ident: region_ident.clone(),
984            region_storage_path: region_storage_path.clone(),
985            region_options: region_options.clone(),
986        };
987        let serialized = serde_json::to_string(&legacy_open_region).unwrap();
988
989        // Deserialize to OpenRegion.
990        let deserialized = serde_json::from_str(&serialized).unwrap();
991        let expected = OpenRegion {
992            region_ident,
993            region_storage_path,
994            region_options,
995            region_wal_options: HashMap::new(),
996            skip_wal_replay: false,
997        };
998        assert_eq!(expected, deserialized);
999    }
1000
1001    #[test]
1002    fn test_flush_regions_creation() {
1003        let region_id = RegionId::new(1024, 1);
1004
1005        // Single region sync flush
1006        let single_sync = FlushRegions::sync_single(region_id);
1007        assert_eq!(single_sync.region_ids, vec![region_id]);
1008        assert_eq!(single_sync.strategy, FlushStrategy::Sync);
1009        assert!(!single_sync.is_hint());
1010        assert!(single_sync.is_sync());
1011        assert_eq!(single_sync.error_strategy, FlushErrorStrategy::FailFast);
1012        assert!(single_sync.is_single_region());
1013        assert_eq!(single_sync.single_region_id(), Some(region_id));
1014
1015        // Batch async flush (hint)
1016        let region_ids = vec![RegionId::new(1024, 1), RegionId::new(1024, 2)];
1017        let batch_async = FlushRegions::async_batch(region_ids.clone());
1018        assert_eq!(batch_async.region_ids, region_ids);
1019        assert_eq!(batch_async.strategy, FlushStrategy::Async);
1020        assert!(batch_async.is_hint());
1021        assert!(!batch_async.is_sync());
1022        assert_eq!(batch_async.error_strategy, FlushErrorStrategy::TryAll);
1023        assert!(!batch_async.is_single_region());
1024        assert_eq!(batch_async.single_region_id(), None);
1025
1026        // Batch sync flush
1027        let batch_sync = FlushRegions::sync_batch(region_ids.clone(), FlushErrorStrategy::FailFast);
1028        assert_eq!(batch_sync.region_ids, region_ids);
1029        assert_eq!(batch_sync.strategy, FlushStrategy::Sync);
1030        assert!(!batch_sync.is_hint());
1031        assert!(batch_sync.is_sync());
1032        assert_eq!(batch_sync.error_strategy, FlushErrorStrategy::FailFast);
1033    }
1034
1035    #[test]
1036    fn test_flush_regions_conversion() {
1037        let region_id = RegionId::new(1024, 1);
1038
1039        let from_region_id: FlushRegions = region_id.into();
1040        assert_eq!(from_region_id.region_ids, vec![region_id]);
1041        assert_eq!(from_region_id.strategy, FlushStrategy::Sync);
1042        assert!(!from_region_id.is_hint());
1043        assert!(from_region_id.is_sync());
1044
1045        // Test default construction
1046        let flush_regions = FlushRegions {
1047            region_ids: vec![region_id],
1048            strategy: FlushStrategy::Async,
1049            error_strategy: FlushErrorStrategy::TryAll,
1050        };
1051        assert_eq!(flush_regions.region_ids, vec![region_id]);
1052        assert_eq!(flush_regions.strategy, FlushStrategy::Async);
1053        assert!(flush_regions.is_hint());
1054        assert!(!flush_regions.is_sync());
1055    }
1056
1057    #[test]
1058    fn test_flush_region_reply() {
1059        let region_id = RegionId::new(1024, 1);
1060
1061        // Successful single region reply
1062        let success_reply = FlushRegionReply::success_single(region_id);
1063        assert!(success_reply.overall_success);
1064        assert_eq!(success_reply.results.len(), 1);
1065        assert_eq!(success_reply.results[0].0, region_id);
1066        assert!(success_reply.results[0].1.is_ok());
1067
1068        // Failed single region reply
1069        let error_reply = FlushRegionReply::error_single(region_id, "test error".to_string());
1070        assert!(!error_reply.overall_success);
1071        assert_eq!(error_reply.results.len(), 1);
1072        assert_eq!(error_reply.results[0].0, region_id);
1073        assert!(error_reply.results[0].1.is_err());
1074
1075        // Batch reply
1076        let region_id2 = RegionId::new(1024, 2);
1077        let results = vec![
1078            (region_id, Ok(())),
1079            (region_id2, Err("flush failed".to_string())),
1080        ];
1081        let batch_reply = FlushRegionReply::from_results(results);
1082        assert!(!batch_reply.overall_success);
1083        assert_eq!(batch_reply.results.len(), 2);
1084
1085        // Conversion to SimpleReply
1086        let simple_reply = batch_reply.to_simple_reply();
1087        assert!(!simple_reply.result);
1088        assert!(simple_reply.error.is_some());
1089        assert!(simple_reply.error.unwrap().contains("flush failed"));
1090    }
1091
1092    #[test]
1093    fn test_serialize_flush_regions_instruction() {
1094        let region_id = RegionId::new(1024, 1);
1095        let flush_regions = FlushRegions::sync_single(region_id);
1096        let instruction = Instruction::FlushRegions(flush_regions.clone());
1097
1098        let serialized = serde_json::to_string(&instruction).unwrap();
1099        let deserialized: Instruction = serde_json::from_str(&serialized).unwrap();
1100
1101        match deserialized {
1102            Instruction::FlushRegions(fr) => {
1103                assert_eq!(fr.region_ids, vec![region_id]);
1104                assert_eq!(fr.strategy, FlushStrategy::Sync);
1105                assert_eq!(fr.error_strategy, FlushErrorStrategy::FailFast);
1106            }
1107            _ => panic!("Expected FlushRegions instruction"),
1108        }
1109    }
1110
1111    #[test]
1112    fn test_serialize_flush_regions_batch_instruction() {
1113        let region_ids = vec![RegionId::new(1024, 1), RegionId::new(1024, 2)];
1114        let flush_regions =
1115            FlushRegions::sync_batch(region_ids.clone(), FlushErrorStrategy::TryAll);
1116        let instruction = Instruction::FlushRegions(flush_regions);
1117
1118        let serialized = serde_json::to_string(&instruction).unwrap();
1119        let deserialized: Instruction = serde_json::from_str(&serialized).unwrap();
1120
1121        match deserialized {
1122            Instruction::FlushRegions(fr) => {
1123                assert_eq!(fr.region_ids, region_ids);
1124                assert_eq!(fr.strategy, FlushStrategy::Sync);
1125                assert!(!fr.is_hint());
1126                assert!(fr.is_sync());
1127                assert_eq!(fr.error_strategy, FlushErrorStrategy::TryAll);
1128            }
1129            _ => panic!("Expected FlushRegions instruction"),
1130        }
1131    }
1132
1133    #[test]
1134    fn test_serialize_get_file_refs_instruction_reply() {
1135        let mut manifest = FileRefsManifest::default();
1136        let r0 = RegionId::new(1024, 1);
1137        let r1 = RegionId::new(1024, 2);
1138        manifest
1139            .file_refs
1140            .insert(r0, HashSet::from([FileId::random()]));
1141        manifest
1142            .file_refs
1143            .insert(r1, HashSet::from([FileId::random()]));
1144        manifest.manifest_version.insert(r0, 10);
1145        manifest.manifest_version.insert(r1, 20);
1146
1147        let instruction_reply = InstructionReply::GetFileRefs(GetFileRefsReply {
1148            file_refs_manifest: manifest,
1149            success: true,
1150            error: None,
1151        });
1152
1153        let serialized = serde_json::to_string(&instruction_reply).unwrap();
1154        let deserialized = serde_json::from_str(&serialized).unwrap();
1155
1156        assert_eq!(instruction_reply, deserialized);
1157    }
1158}