common_meta/
instruction.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::fmt::{Display, Formatter};
17use std::time::Duration;
18
19use serde::{Deserialize, Deserializer, Serialize};
20use store_api::storage::{FileRefsManifest, GcReport, RegionId, RegionNumber};
21use strum::Display;
22use table::metadata::TableId;
23use table::table_name::TableName;
24
25use crate::flow_name::FlowName;
26use crate::key::schema_name::SchemaName;
27use crate::key::{FlowId, FlowPartitionId};
28use crate::peer::Peer;
29use crate::{DatanodeId, FlownodeId};
30
31#[derive(Eq, Hash, PartialEq, Clone, Debug, Serialize, Deserialize)]
32pub struct RegionIdent {
33    pub datanode_id: DatanodeId,
34    pub table_id: TableId,
35    pub region_number: RegionNumber,
36    pub engine: String,
37}
38
39impl RegionIdent {
40    pub fn get_region_id(&self) -> RegionId {
41        RegionId::new(self.table_id, self.region_number)
42    }
43}
44
45impl Display for RegionIdent {
46    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
47        write!(
48            f,
49            "RegionIdent(datanode_id='{}', table_id={}, region_number={}, engine = {})",
50            self.datanode_id, self.table_id, self.region_number, self.engine
51        )
52    }
53}
54
55/// The result of downgrade leader region.
56#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
57pub struct DowngradeRegionReply {
58    /// The [RegionId].
59    /// For compatibility, it is defaulted to [RegionId::new(0, 0)].
60    #[serde(default)]
61    pub region_id: RegionId,
62    /// Returns the `last_entry_id` if available.
63    pub last_entry_id: Option<u64>,
64    /// Returns the `metadata_last_entry_id` if available (Only available for metric engine).
65    pub metadata_last_entry_id: Option<u64>,
66    /// Indicates whether the region exists.
67    pub exists: bool,
68    /// Return error if any during the operation.
69    pub error: Option<String>,
70}
71
72impl Display for DowngradeRegionReply {
73    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
74        write!(
75            f,
76            "(last_entry_id={:?}, exists={}, error={:?})",
77            self.last_entry_id, self.exists, self.error
78        )
79    }
80}
81
82#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
83pub struct SimpleReply {
84    pub result: bool,
85    pub error: Option<String>,
86}
87
88/// Reply for flush region operations with support for batch results.
89#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
90pub struct FlushRegionReply {
91    /// Results for each region that was attempted to be flushed.
92    /// For single region flushes, this will contain one result.
93    /// For batch flushes, this contains results for all attempted regions.
94    pub results: Vec<(RegionId, Result<(), String>)>,
95    /// Overall success: true if all regions were flushed successfully.
96    pub overall_success: bool,
97}
98
99impl FlushRegionReply {
100    /// Create a successful single region reply.
101    pub fn success_single(region_id: RegionId) -> Self {
102        Self {
103            results: vec![(region_id, Ok(()))],
104            overall_success: true,
105        }
106    }
107
108    /// Create a failed single region reply.
109    pub fn error_single(region_id: RegionId, error: String) -> Self {
110        Self {
111            results: vec![(region_id, Err(error))],
112            overall_success: false,
113        }
114    }
115
116    /// Create a batch reply from individual results.
117    pub fn from_results(results: Vec<(RegionId, Result<(), String>)>) -> Self {
118        let overall_success = results.iter().all(|(_, result)| result.is_ok());
119        Self {
120            results,
121            overall_success,
122        }
123    }
124
125    /// Convert to SimpleReply for backward compatibility.
126    pub fn to_simple_reply(&self) -> SimpleReply {
127        if self.overall_success {
128            SimpleReply {
129                result: true,
130                error: None,
131            }
132        } else {
133            let errors: Vec<String> = self
134                .results
135                .iter()
136                .filter_map(|(region_id, result)| {
137                    result
138                        .as_ref()
139                        .err()
140                        .map(|err| format!("{}: {}", region_id, err))
141                })
142                .collect();
143            SimpleReply {
144                result: false,
145                error: Some(errors.join("; ")),
146            }
147        }
148    }
149}
150
151impl Display for SimpleReply {
152    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
153        write!(f, "(result={}, error={:?})", self.result, self.error)
154    }
155}
156
157impl Display for FlushRegionReply {
158    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
159        let results_str = self
160            .results
161            .iter()
162            .map(|(region_id, result)| match result {
163                Ok(()) => format!("{}:OK", region_id),
164                Err(err) => format!("{}:ERR({})", region_id, err),
165            })
166            .collect::<Vec<_>>()
167            .join(", ");
168        write!(
169            f,
170            "(overall_success={}, results=[{}])",
171            self.overall_success, results_str
172        )
173    }
174}
175
176impl Display for OpenRegion {
177    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
178        write!(
179            f,
180            "OpenRegion(region_ident={}, region_storage_path={})",
181            self.region_ident, self.region_storage_path
182        )
183    }
184}
185
186#[serde_with::serde_as]
187#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
188pub struct OpenRegion {
189    pub region_ident: RegionIdent,
190    pub region_storage_path: String,
191    pub region_options: HashMap<String, String>,
192    #[serde(default)]
193    #[serde_as(as = "HashMap<serde_with::DisplayFromStr, _>")]
194    pub region_wal_options: HashMap<RegionNumber, String>,
195    #[serde(default)]
196    pub skip_wal_replay: bool,
197}
198
199impl OpenRegion {
200    pub fn new(
201        region_ident: RegionIdent,
202        path: &str,
203        region_options: HashMap<String, String>,
204        region_wal_options: HashMap<RegionNumber, String>,
205        skip_wal_replay: bool,
206    ) -> Self {
207        Self {
208            region_ident,
209            region_storage_path: path.to_string(),
210            region_options,
211            region_wal_options,
212            skip_wal_replay,
213        }
214    }
215}
216
217/// The instruction of downgrading leader region.
218#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
219pub struct DowngradeRegion {
220    /// The [RegionId].
221    pub region_id: RegionId,
222    /// The timeout of waiting for flush the region.
223    ///
224    /// `None` stands for don't flush before downgrading the region.
225    #[serde(default)]
226    pub flush_timeout: Option<Duration>,
227}
228
229impl Display for DowngradeRegion {
230    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
231        write!(
232            f,
233            "DowngradeRegion(region_id={}, flush_timeout={:?})",
234            self.region_id, self.flush_timeout,
235        )
236    }
237}
238
239/// Upgrades a follower region to leader region.
240#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
241pub struct UpgradeRegion {
242    /// The [RegionId].
243    pub region_id: RegionId,
244    /// The `last_entry_id` of old leader region.
245    pub last_entry_id: Option<u64>,
246    /// The `last_entry_id` of old leader metadata region (Only used for metric engine).
247    pub metadata_last_entry_id: Option<u64>,
248    /// The timeout of waiting for a wal replay.
249    ///
250    /// `None` stands for no wait,
251    /// it's helpful to verify whether the leader region is ready.
252    #[serde(with = "humantime_serde")]
253    pub replay_timeout: Duration,
254    /// The hint for replaying memtable.
255    #[serde(default)]
256    pub location_id: Option<u64>,
257    #[serde(default, skip_serializing_if = "Option::is_none")]
258    pub replay_entry_id: Option<u64>,
259    #[serde(default, skip_serializing_if = "Option::is_none")]
260    pub metadata_replay_entry_id: Option<u64>,
261}
262
263impl UpgradeRegion {
264    /// Sets the replay entry id.
265    pub fn with_replay_entry_id(mut self, replay_entry_id: Option<u64>) -> Self {
266        self.replay_entry_id = replay_entry_id;
267        self
268    }
269
270    /// Sets the metadata replay entry id.
271    pub fn with_metadata_replay_entry_id(mut self, metadata_replay_entry_id: Option<u64>) -> Self {
272        self.metadata_replay_entry_id = metadata_replay_entry_id;
273        self
274    }
275}
276
277#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
278/// The identifier of cache.
279pub enum CacheIdent {
280    FlowId(FlowId),
281    /// Indicate change of address of flownode.
282    FlowNodeAddressChange(u64),
283    FlowName(FlowName),
284    TableId(TableId),
285    TableName(TableName),
286    SchemaName(SchemaName),
287    CreateFlow(CreateFlow),
288    DropFlow(DropFlow),
289}
290
291#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
292pub struct CreateFlow {
293    /// The unique identifier for the flow.
294    pub flow_id: FlowId,
295    pub source_table_ids: Vec<TableId>,
296    /// Mapping of flow partition to peer information
297    pub partition_to_peer_mapping: Vec<(FlowPartitionId, Peer)>,
298}
299
300#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
301pub struct DropFlow {
302    pub flow_id: FlowId,
303    pub source_table_ids: Vec<TableId>,
304    /// Mapping of flow partition to flownode id
305    pub flow_part2node_id: Vec<(FlowPartitionId, FlownodeId)>,
306}
307
308/// Strategy for executing flush operations.
309#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
310pub enum FlushStrategy {
311    /// Synchronous operation that waits for completion and expects a reply
312    #[default]
313    Sync,
314    /// Asynchronous hint operation (fire-and-forget, no reply expected)
315    Async,
316}
317
318/// Error handling strategy for batch flush operations.
319#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
320pub enum FlushErrorStrategy {
321    /// Abort on first error (fail-fast)
322    #[default]
323    FailFast,
324    /// Attempt to flush all regions and collect all errors
325    TryAll,
326}
327
328/// Unified flush instruction supporting both single and batch operations
329/// with configurable execution strategies and error handling.
330#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
331pub struct FlushRegions {
332    /// List of region IDs to flush. Can contain a single region or multiple regions.
333    pub region_ids: Vec<RegionId>,
334    /// Execution strategy: Sync (expects reply) or Async (fire-and-forget hint).
335    #[serde(default)]
336    pub strategy: FlushStrategy,
337    /// Error handling strategy for batch operations (only applies when multiple regions and sync strategy).
338    #[serde(default)]
339    pub error_strategy: FlushErrorStrategy,
340}
341
342impl Display for FlushRegions {
343    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
344        write!(
345            f,
346            "FlushRegions(region_ids={:?}, strategy={:?}, error_strategy={:?})",
347            self.region_ids, self.strategy, self.error_strategy
348        )
349    }
350}
351
352impl FlushRegions {
353    /// Create synchronous single-region flush
354    pub fn sync_single(region_id: RegionId) -> Self {
355        Self {
356            region_ids: vec![region_id],
357            strategy: FlushStrategy::Sync,
358            error_strategy: FlushErrorStrategy::FailFast,
359        }
360    }
361
362    /// Create asynchronous batch flush (fire-and-forget)
363    pub fn async_batch(region_ids: Vec<RegionId>) -> Self {
364        Self {
365            region_ids,
366            strategy: FlushStrategy::Async,
367            error_strategy: FlushErrorStrategy::TryAll,
368        }
369    }
370
371    /// Create synchronous batch flush with error strategy
372    pub fn sync_batch(region_ids: Vec<RegionId>, error_strategy: FlushErrorStrategy) -> Self {
373        Self {
374            region_ids,
375            strategy: FlushStrategy::Sync,
376            error_strategy,
377        }
378    }
379
380    /// Check if this is a single region flush.
381    pub fn is_single_region(&self) -> bool {
382        self.region_ids.len() == 1
383    }
384
385    /// Get the single region ID if this is a single region flush.
386    pub fn single_region_id(&self) -> Option<RegionId> {
387        if self.is_single_region() {
388            self.region_ids.first().copied()
389        } else {
390            None
391        }
392    }
393
394    /// Check if this is a hint (asynchronous) operation.
395    pub fn is_hint(&self) -> bool {
396        matches!(self.strategy, FlushStrategy::Async)
397    }
398
399    /// Check if this is a synchronous operation.
400    pub fn is_sync(&self) -> bool {
401        matches!(self.strategy, FlushStrategy::Sync)
402    }
403}
404
405impl From<RegionId> for FlushRegions {
406    fn from(region_id: RegionId) -> Self {
407        Self::sync_single(region_id)
408    }
409}
410
411#[derive(Debug, Deserialize)]
412#[serde(untagged)]
413enum SingleOrMultiple<T> {
414    Single(T),
415    Multiple(Vec<T>),
416}
417
418fn single_or_multiple_from<'de, D, T>(deserializer: D) -> Result<Vec<T>, D::Error>
419where
420    D: Deserializer<'de>,
421    T: Deserialize<'de>,
422{
423    let helper = SingleOrMultiple::<T>::deserialize(deserializer)?;
424    Ok(match helper {
425        SingleOrMultiple::Single(x) => vec![x],
426        SingleOrMultiple::Multiple(xs) => xs,
427    })
428}
429
430/// Instruction to get file references for specified regions.
431#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
432pub struct GetFileRefs {
433    /// List of region IDs to get file references from active FileHandles (in-memory).
434    pub query_regions: Vec<RegionId>,
435    /// Mapping from the source region ID (where to read the manifest) to
436    /// the target region IDs (whose file references to look for).
437    /// Key: The region ID of the manifest.
438    /// Value: The list of region IDs to find references for in that manifest.
439    pub related_regions: HashMap<RegionId, Vec<RegionId>>,
440}
441
442impl Display for GetFileRefs {
443    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
444        write!(f, "GetFileRefs(region_ids={:?})", self.query_regions)
445    }
446}
447
448/// Instruction to trigger garbage collection for a region.
449#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
450pub struct GcRegions {
451    /// The region ID to perform GC on, only regions that are currently on the given datanode can be garbage collected, regions not on the datanode will report errors.
452    pub regions: Vec<RegionId>,
453    /// The file references manifest containing temporary file references.
454    pub file_refs_manifest: FileRefsManifest,
455    /// Whether to perform a full file listing to find orphan files.
456    pub full_file_listing: bool,
457}
458
459impl Display for GcRegions {
460    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
461        write!(
462            f,
463            "GcRegion(regions={:?}, file_refs_count={}, full_file_listing={})",
464            self.regions,
465            self.file_refs_manifest.file_refs.len(),
466            self.full_file_listing
467        )
468    }
469}
470
471/// Reply for GetFileRefs instruction.
472#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
473pub struct GetFileRefsReply {
474    /// The file references manifest.
475    pub file_refs_manifest: FileRefsManifest,
476    /// Whether the operation was successful.
477    pub success: bool,
478    /// Error message if any.
479    pub error: Option<String>,
480}
481
482impl Display for GetFileRefsReply {
483    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
484        write!(
485            f,
486            "GetFileRefsReply(success={}, file_refs_count={}, error={:?})",
487            self.success,
488            self.file_refs_manifest.file_refs.len(),
489            self.error
490        )
491    }
492}
493
494/// Reply for GC instruction.
495#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
496pub struct GcRegionsReply {
497    pub result: Result<GcReport, String>,
498}
499
500impl Display for GcRegionsReply {
501    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
502        write!(
503            f,
504            "GcReply(result={})",
505            match &self.result {
506                Ok(report) => format!(
507                    "GcReport(deleted_files_count={}, need_retry_regions_count={})",
508                    report.deleted_files.len(),
509                    report.need_retry_regions.len()
510                ),
511                Err(err) => format!("Err({})", err),
512            }
513        )
514    }
515}
516
517#[derive(Debug, Clone, Serialize, Deserialize, Display, PartialEq)]
518pub enum Instruction {
519    /// Opens regions.
520    #[serde(deserialize_with = "single_or_multiple_from", alias = "OpenRegion")]
521    OpenRegions(Vec<OpenRegion>),
522    /// Closes regions.
523    #[serde(deserialize_with = "single_or_multiple_from", alias = "CloseRegion")]
524    CloseRegions(Vec<RegionIdent>),
525    /// Upgrades regions.
526    #[serde(deserialize_with = "single_or_multiple_from", alias = "UpgradeRegion")]
527    UpgradeRegions(Vec<UpgradeRegion>),
528    #[serde(
529        deserialize_with = "single_or_multiple_from",
530        alias = "DowngradeRegion"
531    )]
532    /// Downgrades regions.
533    DowngradeRegions(Vec<DowngradeRegion>),
534    /// Invalidates batch cache.
535    InvalidateCaches(Vec<CacheIdent>),
536    /// Flushes regions.
537    FlushRegions(FlushRegions),
538    /// Gets file references for regions.
539    GetFileRefs(GetFileRefs),
540    /// Triggers garbage collection for a region.
541    GcRegions(GcRegions),
542    /// Temporary suspend serving reads or writes
543    Suspend,
544}
545
546impl Instruction {
547    /// Converts the instruction into a vector of [OpenRegion].
548    pub fn into_open_regions(self) -> Option<Vec<OpenRegion>> {
549        match self {
550            Self::OpenRegions(open_regions) => Some(open_regions),
551            _ => None,
552        }
553    }
554
555    /// Converts the instruction into a vector of [RegionIdent].
556    pub fn into_close_regions(self) -> Option<Vec<RegionIdent>> {
557        match self {
558            Self::CloseRegions(close_regions) => Some(close_regions),
559            _ => None,
560        }
561    }
562
563    /// Converts the instruction into a [FlushRegions].
564    pub fn into_flush_regions(self) -> Option<FlushRegions> {
565        match self {
566            Self::FlushRegions(flush_regions) => Some(flush_regions),
567            _ => None,
568        }
569    }
570
571    /// Converts the instruction into a [DowngradeRegion].
572    pub fn into_downgrade_regions(self) -> Option<Vec<DowngradeRegion>> {
573        match self {
574            Self::DowngradeRegions(downgrade_region) => Some(downgrade_region),
575            _ => None,
576        }
577    }
578
579    /// Converts the instruction into a [UpgradeRegion].
580    pub fn into_upgrade_regions(self) -> Option<Vec<UpgradeRegion>> {
581        match self {
582            Self::UpgradeRegions(upgrade_region) => Some(upgrade_region),
583            _ => None,
584        }
585    }
586
587    pub fn into_get_file_refs(self) -> Option<GetFileRefs> {
588        match self {
589            Self::GetFileRefs(get_file_refs) => Some(get_file_refs),
590            _ => None,
591        }
592    }
593
594    pub fn into_gc_regions(self) -> Option<GcRegions> {
595        match self {
596            Self::GcRegions(gc_regions) => Some(gc_regions),
597            _ => None,
598        }
599    }
600}
601
602/// The reply of [UpgradeRegion].
603#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
604pub struct UpgradeRegionReply {
605    /// The [RegionId].
606    /// For compatibility, it is defaulted to [RegionId::new(0, 0)].
607    #[serde(default)]
608    pub region_id: RegionId,
609    /// Returns true if `last_entry_id` has been replayed to the latest.
610    pub ready: bool,
611    /// Indicates whether the region exists.
612    pub exists: bool,
613    /// Returns error if any.
614    pub error: Option<String>,
615}
616
617impl Display for UpgradeRegionReply {
618    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
619        write!(
620            f,
621            "(ready={}, exists={}, error={:?})",
622            self.ready, self.exists, self.error
623        )
624    }
625}
626
627#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
628pub struct DowngradeRegionsReply {
629    pub replies: Vec<DowngradeRegionReply>,
630}
631
632impl DowngradeRegionsReply {
633    pub fn new(replies: Vec<DowngradeRegionReply>) -> Self {
634        Self { replies }
635    }
636
637    pub fn single(reply: DowngradeRegionReply) -> Self {
638        Self::new(vec![reply])
639    }
640}
641
642#[derive(Deserialize)]
643#[serde(untagged)]
644enum DowngradeRegionsCompat {
645    Single(DowngradeRegionReply),
646    Multiple(DowngradeRegionsReply),
647}
648
649fn downgrade_regions_compat_from<'de, D>(deserializer: D) -> Result<DowngradeRegionsReply, D::Error>
650where
651    D: Deserializer<'de>,
652{
653    let helper = DowngradeRegionsCompat::deserialize(deserializer)?;
654    Ok(match helper {
655        DowngradeRegionsCompat::Single(x) => DowngradeRegionsReply::new(vec![x]),
656        DowngradeRegionsCompat::Multiple(reply) => reply,
657    })
658}
659
660#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
661pub struct UpgradeRegionsReply {
662    pub replies: Vec<UpgradeRegionReply>,
663}
664
665impl UpgradeRegionsReply {
666    pub fn new(replies: Vec<UpgradeRegionReply>) -> Self {
667        Self { replies }
668    }
669
670    pub fn single(reply: UpgradeRegionReply) -> Self {
671        Self::new(vec![reply])
672    }
673}
674
675#[derive(Deserialize)]
676#[serde(untagged)]
677enum UpgradeRegionsCompat {
678    Single(UpgradeRegionReply),
679    Multiple(UpgradeRegionsReply),
680}
681
682fn upgrade_regions_compat_from<'de, D>(deserializer: D) -> Result<UpgradeRegionsReply, D::Error>
683where
684    D: Deserializer<'de>,
685{
686    let helper = UpgradeRegionsCompat::deserialize(deserializer)?;
687    Ok(match helper {
688        UpgradeRegionsCompat::Single(x) => UpgradeRegionsReply::new(vec![x]),
689        UpgradeRegionsCompat::Multiple(reply) => reply,
690    })
691}
692
693#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
694#[serde(tag = "type", rename_all = "snake_case")]
695pub enum InstructionReply {
696    #[serde(alias = "open_region")]
697    OpenRegions(SimpleReply),
698    #[serde(alias = "close_region")]
699    CloseRegions(SimpleReply),
700    #[serde(
701        deserialize_with = "upgrade_regions_compat_from",
702        alias = "upgrade_region"
703    )]
704    UpgradeRegions(UpgradeRegionsReply),
705    #[serde(
706        alias = "downgrade_region",
707        deserialize_with = "downgrade_regions_compat_from"
708    )]
709    DowngradeRegions(DowngradeRegionsReply),
710    FlushRegions(FlushRegionReply),
711    GetFileRefs(GetFileRefsReply),
712    GcRegions(GcRegionsReply),
713}
714
715impl Display for InstructionReply {
716    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
717        match self {
718            Self::OpenRegions(reply) => write!(f, "InstructionReply::OpenRegions({})", reply),
719            Self::CloseRegions(reply) => write!(f, "InstructionReply::CloseRegions({})", reply),
720            Self::UpgradeRegions(reply) => {
721                write!(f, "InstructionReply::UpgradeRegions({:?})", reply.replies)
722            }
723            Self::DowngradeRegions(reply) => {
724                write!(f, "InstructionReply::DowngradeRegions({:?})", reply.replies)
725            }
726            Self::FlushRegions(reply) => write!(f, "InstructionReply::FlushRegions({})", reply),
727            Self::GetFileRefs(reply) => write!(f, "InstructionReply::GetFileRefs({})", reply),
728            Self::GcRegions(reply) => write!(f, "InstructionReply::GcRegion({})", reply),
729        }
730    }
731}
732
733#[cfg(any(test, feature = "testing"))]
734impl InstructionReply {
735    pub fn expect_close_regions_reply(self) -> SimpleReply {
736        match self {
737            Self::CloseRegions(reply) => reply,
738            _ => panic!("Expected CloseRegions reply"),
739        }
740    }
741
742    pub fn expect_open_regions_reply(self) -> SimpleReply {
743        match self {
744            Self::OpenRegions(reply) => reply,
745            _ => panic!("Expected OpenRegions reply"),
746        }
747    }
748
749    pub fn expect_upgrade_regions_reply(self) -> Vec<UpgradeRegionReply> {
750        match self {
751            Self::UpgradeRegions(reply) => reply.replies,
752            _ => panic!("Expected UpgradeRegion reply"),
753        }
754    }
755
756    pub fn expect_downgrade_regions_reply(self) -> Vec<DowngradeRegionReply> {
757        match self {
758            Self::DowngradeRegions(reply) => reply.replies,
759            _ => panic!("Expected DowngradeRegion reply"),
760        }
761    }
762
763    pub fn expect_flush_regions_reply(self) -> FlushRegionReply {
764        match self {
765            Self::FlushRegions(reply) => reply,
766            _ => panic!("Expected FlushRegions reply"),
767        }
768    }
769}
770
771#[cfg(test)]
772mod tests {
773    use std::collections::HashSet;
774
775    use store_api::storage::FileId;
776
777    use super::*;
778
779    #[test]
780    fn test_serialize_instruction() {
781        let open_region = Instruction::OpenRegions(vec![OpenRegion::new(
782            RegionIdent {
783                datanode_id: 2,
784                table_id: 1024,
785                region_number: 1,
786                engine: "mito2".to_string(),
787            },
788            "test/foo",
789            HashMap::new(),
790            HashMap::new(),
791            false,
792        )]);
793
794        let serialized = serde_json::to_string(&open_region).unwrap();
795        assert_eq!(
796            r#"{"OpenRegions":[{"region_ident":{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"},"region_storage_path":"test/foo","region_options":{},"region_wal_options":{},"skip_wal_replay":false}]}"#,
797            serialized
798        );
799
800        let close_region = Instruction::CloseRegions(vec![RegionIdent {
801            datanode_id: 2,
802            table_id: 1024,
803            region_number: 1,
804            engine: "mito2".to_string(),
805        }]);
806
807        let serialized = serde_json::to_string(&close_region).unwrap();
808        assert_eq!(
809            r#"{"CloseRegions":[{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"}]}"#,
810            serialized
811        );
812
813        let upgrade_region = Instruction::UpgradeRegions(vec![UpgradeRegion {
814            region_id: RegionId::new(1024, 1),
815            last_entry_id: None,
816            metadata_last_entry_id: None,
817            replay_timeout: Duration::from_millis(1000),
818            location_id: None,
819            replay_entry_id: None,
820            metadata_replay_entry_id: None,
821        }]);
822
823        let serialized = serde_json::to_string(&upgrade_region).unwrap();
824        assert_eq!(
825            r#"{"UpgradeRegions":[{"region_id":4398046511105,"last_entry_id":null,"metadata_last_entry_id":null,"replay_timeout":"1s","location_id":null}]}"#,
826            serialized
827        );
828    }
829
830    #[test]
831    fn test_serialize_instruction_reply() {
832        let downgrade_region_reply = InstructionReply::DowngradeRegions(
833            DowngradeRegionsReply::single(DowngradeRegionReply {
834                region_id: RegionId::new(1024, 1),
835                last_entry_id: None,
836                metadata_last_entry_id: None,
837                exists: true,
838                error: None,
839            }),
840        );
841
842        let serialized = serde_json::to_string(&downgrade_region_reply).unwrap();
843        assert_eq!(
844            r#"{"type":"downgrade_regions","replies":[{"region_id":4398046511105,"last_entry_id":null,"metadata_last_entry_id":null,"exists":true,"error":null}]}"#,
845            serialized
846        );
847
848        let upgrade_region_reply =
849            InstructionReply::UpgradeRegions(UpgradeRegionsReply::single(UpgradeRegionReply {
850                region_id: RegionId::new(1024, 1),
851                ready: true,
852                exists: true,
853                error: None,
854            }));
855        let serialized = serde_json::to_string(&upgrade_region_reply).unwrap();
856        assert_eq!(
857            r#"{"type":"upgrade_regions","replies":[{"region_id":4398046511105,"ready":true,"exists":true,"error":null}]}"#,
858            serialized
859        );
860    }
861
862    #[test]
863    fn test_deserialize_instruction() {
864        // legacy open region instruction
865        let open_region_instruction = r#"{"OpenRegion":{"region_ident":{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"},"region_storage_path":"test/foo","region_options":{},"region_wal_options":{},"skip_wal_replay":false}}"#;
866        let open_region_instruction: Instruction =
867            serde_json::from_str(open_region_instruction).unwrap();
868        let open_region = Instruction::OpenRegions(vec![OpenRegion::new(
869            RegionIdent {
870                datanode_id: 2,
871                table_id: 1024,
872                region_number: 1,
873                engine: "mito2".to_string(),
874            },
875            "test/foo",
876            HashMap::new(),
877            HashMap::new(),
878            false,
879        )]);
880        assert_eq!(open_region_instruction, open_region);
881
882        // legacy close region instruction
883        let close_region_instruction = r#"{"CloseRegion":{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"}}"#;
884        let close_region_instruction: Instruction =
885            serde_json::from_str(close_region_instruction).unwrap();
886        let close_region = Instruction::CloseRegions(vec![RegionIdent {
887            datanode_id: 2,
888            table_id: 1024,
889            region_number: 1,
890            engine: "mito2".to_string(),
891        }]);
892        assert_eq!(close_region_instruction, close_region);
893
894        // legacy downgrade region instruction
895        let downgrade_region_instruction = r#"{"DowngradeRegions":{"region_id":4398046511105,"flush_timeout":{"secs":1,"nanos":0}}}"#;
896        let downgrade_region_instruction: Instruction =
897            serde_json::from_str(downgrade_region_instruction).unwrap();
898        let downgrade_region = Instruction::DowngradeRegions(vec![DowngradeRegion {
899            region_id: RegionId::new(1024, 1),
900            flush_timeout: Some(Duration::from_millis(1000)),
901        }]);
902        assert_eq!(downgrade_region_instruction, downgrade_region);
903
904        // legacy upgrade region instruction
905        let upgrade_region_instruction = r#"{"UpgradeRegion":{"region_id":4398046511105,"last_entry_id":null,"metadata_last_entry_id":null,"replay_timeout":"1s","location_id":null,"replay_entry_id":null,"metadata_replay_entry_id":null}}"#;
906        let upgrade_region_instruction: Instruction =
907            serde_json::from_str(upgrade_region_instruction).unwrap();
908        let upgrade_region = Instruction::UpgradeRegions(vec![UpgradeRegion {
909            region_id: RegionId::new(1024, 1),
910            last_entry_id: None,
911            metadata_last_entry_id: None,
912            replay_timeout: Duration::from_millis(1000),
913            location_id: None,
914            replay_entry_id: None,
915            metadata_replay_entry_id: None,
916        }]);
917        assert_eq!(upgrade_region_instruction, upgrade_region);
918    }
919
920    #[test]
921    fn test_deserialize_instruction_reply() {
922        // legacy close region reply
923        let close_region_instruction_reply =
924            r#"{"result":true,"error":null,"type":"close_region"}"#;
925        let close_region_instruction_reply: InstructionReply =
926            serde_json::from_str(close_region_instruction_reply).unwrap();
927        let close_region_reply = InstructionReply::CloseRegions(SimpleReply {
928            result: true,
929            error: None,
930        });
931        assert_eq!(close_region_instruction_reply, close_region_reply);
932
933        // legacy open region reply
934        let open_region_instruction_reply = r#"{"result":true,"error":null,"type":"open_region"}"#;
935        let open_region_instruction_reply: InstructionReply =
936            serde_json::from_str(open_region_instruction_reply).unwrap();
937        let open_region_reply = InstructionReply::OpenRegions(SimpleReply {
938            result: true,
939            error: None,
940        });
941        assert_eq!(open_region_instruction_reply, open_region_reply);
942
943        // legacy downgrade region reply
944        let downgrade_region_instruction_reply = r#"{"region_id":4398046511105,"last_entry_id":null,"metadata_last_entry_id":null,"exists":true,"error":null,"type":"downgrade_region"}"#;
945        let downgrade_region_instruction_reply: InstructionReply =
946            serde_json::from_str(downgrade_region_instruction_reply).unwrap();
947        let downgrade_region_reply = InstructionReply::DowngradeRegions(
948            DowngradeRegionsReply::single(DowngradeRegionReply {
949                region_id: RegionId::new(1024, 1),
950                last_entry_id: None,
951                metadata_last_entry_id: None,
952                exists: true,
953                error: None,
954            }),
955        );
956        assert_eq!(downgrade_region_instruction_reply, downgrade_region_reply);
957
958        // legacy upgrade region reply
959        let upgrade_region_instruction_reply = r#"{"region_id":4398046511105,"ready":true,"exists":true,"error":null,"type":"upgrade_region"}"#;
960        let upgrade_region_instruction_reply: InstructionReply =
961            serde_json::from_str(upgrade_region_instruction_reply).unwrap();
962        let upgrade_region_reply =
963            InstructionReply::UpgradeRegions(UpgradeRegionsReply::single(UpgradeRegionReply {
964                region_id: RegionId::new(1024, 1),
965                ready: true,
966                exists: true,
967                error: None,
968            }));
969        assert_eq!(upgrade_region_instruction_reply, upgrade_region_reply);
970    }
971
972    #[derive(Debug, Clone, Serialize, Deserialize)]
973    struct LegacyOpenRegion {
974        region_ident: RegionIdent,
975        region_storage_path: String,
976        region_options: HashMap<String, String>,
977    }
978
979    #[test]
980    fn test_compatible_serialize_open_region() {
981        let region_ident = RegionIdent {
982            datanode_id: 2,
983            table_id: 1024,
984            region_number: 1,
985            engine: "mito2".to_string(),
986        };
987        let region_storage_path = "test/foo".to_string();
988        let region_options = HashMap::from([
989            ("a".to_string(), "aa".to_string()),
990            ("b".to_string(), "bb".to_string()),
991        ]);
992
993        // Serialize a legacy OpenRegion.
994        let legacy_open_region = LegacyOpenRegion {
995            region_ident: region_ident.clone(),
996            region_storage_path: region_storage_path.clone(),
997            region_options: region_options.clone(),
998        };
999        let serialized = serde_json::to_string(&legacy_open_region).unwrap();
1000
1001        // Deserialize to OpenRegion.
1002        let deserialized = serde_json::from_str(&serialized).unwrap();
1003        let expected = OpenRegion {
1004            region_ident,
1005            region_storage_path,
1006            region_options,
1007            region_wal_options: HashMap::new(),
1008            skip_wal_replay: false,
1009        };
1010        assert_eq!(expected, deserialized);
1011    }
1012
1013    #[test]
1014    fn test_flush_regions_creation() {
1015        let region_id = RegionId::new(1024, 1);
1016
1017        // Single region sync flush
1018        let single_sync = FlushRegions::sync_single(region_id);
1019        assert_eq!(single_sync.region_ids, vec![region_id]);
1020        assert_eq!(single_sync.strategy, FlushStrategy::Sync);
1021        assert!(!single_sync.is_hint());
1022        assert!(single_sync.is_sync());
1023        assert_eq!(single_sync.error_strategy, FlushErrorStrategy::FailFast);
1024        assert!(single_sync.is_single_region());
1025        assert_eq!(single_sync.single_region_id(), Some(region_id));
1026
1027        // Batch async flush (hint)
1028        let region_ids = vec![RegionId::new(1024, 1), RegionId::new(1024, 2)];
1029        let batch_async = FlushRegions::async_batch(region_ids.clone());
1030        assert_eq!(batch_async.region_ids, region_ids);
1031        assert_eq!(batch_async.strategy, FlushStrategy::Async);
1032        assert!(batch_async.is_hint());
1033        assert!(!batch_async.is_sync());
1034        assert_eq!(batch_async.error_strategy, FlushErrorStrategy::TryAll);
1035        assert!(!batch_async.is_single_region());
1036        assert_eq!(batch_async.single_region_id(), None);
1037
1038        // Batch sync flush
1039        let batch_sync = FlushRegions::sync_batch(region_ids.clone(), FlushErrorStrategy::FailFast);
1040        assert_eq!(batch_sync.region_ids, region_ids);
1041        assert_eq!(batch_sync.strategy, FlushStrategy::Sync);
1042        assert!(!batch_sync.is_hint());
1043        assert!(batch_sync.is_sync());
1044        assert_eq!(batch_sync.error_strategy, FlushErrorStrategy::FailFast);
1045    }
1046
1047    #[test]
1048    fn test_flush_regions_conversion() {
1049        let region_id = RegionId::new(1024, 1);
1050
1051        let from_region_id: FlushRegions = region_id.into();
1052        assert_eq!(from_region_id.region_ids, vec![region_id]);
1053        assert_eq!(from_region_id.strategy, FlushStrategy::Sync);
1054        assert!(!from_region_id.is_hint());
1055        assert!(from_region_id.is_sync());
1056
1057        // Test default construction
1058        let flush_regions = FlushRegions {
1059            region_ids: vec![region_id],
1060            strategy: FlushStrategy::Async,
1061            error_strategy: FlushErrorStrategy::TryAll,
1062        };
1063        assert_eq!(flush_regions.region_ids, vec![region_id]);
1064        assert_eq!(flush_regions.strategy, FlushStrategy::Async);
1065        assert!(flush_regions.is_hint());
1066        assert!(!flush_regions.is_sync());
1067    }
1068
1069    #[test]
1070    fn test_flush_region_reply() {
1071        let region_id = RegionId::new(1024, 1);
1072
1073        // Successful single region reply
1074        let success_reply = FlushRegionReply::success_single(region_id);
1075        assert!(success_reply.overall_success);
1076        assert_eq!(success_reply.results.len(), 1);
1077        assert_eq!(success_reply.results[0].0, region_id);
1078        assert!(success_reply.results[0].1.is_ok());
1079
1080        // Failed single region reply
1081        let error_reply = FlushRegionReply::error_single(region_id, "test error".to_string());
1082        assert!(!error_reply.overall_success);
1083        assert_eq!(error_reply.results.len(), 1);
1084        assert_eq!(error_reply.results[0].0, region_id);
1085        assert!(error_reply.results[0].1.is_err());
1086
1087        // Batch reply
1088        let region_id2 = RegionId::new(1024, 2);
1089        let results = vec![
1090            (region_id, Ok(())),
1091            (region_id2, Err("flush failed".to_string())),
1092        ];
1093        let batch_reply = FlushRegionReply::from_results(results);
1094        assert!(!batch_reply.overall_success);
1095        assert_eq!(batch_reply.results.len(), 2);
1096
1097        // Conversion to SimpleReply
1098        let simple_reply = batch_reply.to_simple_reply();
1099        assert!(!simple_reply.result);
1100        assert!(simple_reply.error.is_some());
1101        assert!(simple_reply.error.unwrap().contains("flush failed"));
1102    }
1103
1104    #[test]
1105    fn test_serialize_flush_regions_instruction() {
1106        let region_id = RegionId::new(1024, 1);
1107        let flush_regions = FlushRegions::sync_single(region_id);
1108        let instruction = Instruction::FlushRegions(flush_regions.clone());
1109
1110        let serialized = serde_json::to_string(&instruction).unwrap();
1111        let deserialized: Instruction = serde_json::from_str(&serialized).unwrap();
1112
1113        match deserialized {
1114            Instruction::FlushRegions(fr) => {
1115                assert_eq!(fr.region_ids, vec![region_id]);
1116                assert_eq!(fr.strategy, FlushStrategy::Sync);
1117                assert_eq!(fr.error_strategy, FlushErrorStrategy::FailFast);
1118            }
1119            _ => panic!("Expected FlushRegions instruction"),
1120        }
1121    }
1122
1123    #[test]
1124    fn test_serialize_flush_regions_batch_instruction() {
1125        let region_ids = vec![RegionId::new(1024, 1), RegionId::new(1024, 2)];
1126        let flush_regions =
1127            FlushRegions::sync_batch(region_ids.clone(), FlushErrorStrategy::TryAll);
1128        let instruction = Instruction::FlushRegions(flush_regions);
1129
1130        let serialized = serde_json::to_string(&instruction).unwrap();
1131        let deserialized: Instruction = serde_json::from_str(&serialized).unwrap();
1132
1133        match deserialized {
1134            Instruction::FlushRegions(fr) => {
1135                assert_eq!(fr.region_ids, region_ids);
1136                assert_eq!(fr.strategy, FlushStrategy::Sync);
1137                assert!(!fr.is_hint());
1138                assert!(fr.is_sync());
1139                assert_eq!(fr.error_strategy, FlushErrorStrategy::TryAll);
1140            }
1141            _ => panic!("Expected FlushRegions instruction"),
1142        }
1143    }
1144
1145    #[test]
1146    fn test_serialize_get_file_refs_instruction_reply() {
1147        let mut manifest = FileRefsManifest::default();
1148        let r0 = RegionId::new(1024, 1);
1149        let r1 = RegionId::new(1024, 2);
1150        manifest
1151            .file_refs
1152            .insert(r0, HashSet::from([FileId::random()]));
1153        manifest
1154            .file_refs
1155            .insert(r1, HashSet::from([FileId::random()]));
1156        manifest.manifest_version.insert(r0, 10);
1157        manifest.manifest_version.insert(r1, 20);
1158
1159        let instruction_reply = InstructionReply::GetFileRefs(GetFileRefsReply {
1160            file_refs_manifest: manifest,
1161            success: true,
1162            error: None,
1163        });
1164
1165        let serialized = serde_json::to_string(&instruction_reply).unwrap();
1166        let deserialized = serde_json::from_str(&serialized).unwrap();
1167
1168        assert_eq!(instruction_reply, deserialized);
1169    }
1170}