common_meta/
instruction.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::fmt::{Display, Formatter};
17use std::time::Duration;
18
19use serde::{Deserialize, Deserializer, Serialize};
20use store_api::storage::{FileRefsManifest, GcReport, RegionId, RegionNumber};
21use strum::Display;
22use table::metadata::TableId;
23use table::table_name::TableName;
24
25use crate::flow_name::FlowName;
26use crate::key::schema_name::SchemaName;
27use crate::key::{FlowId, FlowPartitionId};
28use crate::peer::Peer;
29use crate::{DatanodeId, FlownodeId};
30
31#[derive(Eq, Hash, PartialEq, Clone, Debug, Serialize, Deserialize)]
32pub struct RegionIdent {
33    pub datanode_id: DatanodeId,
34    pub table_id: TableId,
35    pub region_number: RegionNumber,
36    pub engine: String,
37}
38
39impl RegionIdent {
40    pub fn get_region_id(&self) -> RegionId {
41        RegionId::new(self.table_id, self.region_number)
42    }
43}
44
45impl Display for RegionIdent {
46    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
47        write!(
48            f,
49            "RegionIdent(datanode_id='{}', table_id={}, region_number={}, engine = {})",
50            self.datanode_id, self.table_id, self.region_number, self.engine
51        )
52    }
53}
54
55/// The result of downgrade leader region.
56#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
57pub struct DowngradeRegionReply {
58    /// The [RegionId].
59    /// For compatibility, it is defaulted to [RegionId::new(0, 0)].
60    #[serde(default)]
61    pub region_id: RegionId,
62    /// Returns the `last_entry_id` if available.
63    pub last_entry_id: Option<u64>,
64    /// Returns the `metadata_last_entry_id` if available (Only available for metric engine).
65    pub metadata_last_entry_id: Option<u64>,
66    /// Indicates whether the region exists.
67    pub exists: bool,
68    /// Return error if any during the operation.
69    pub error: Option<String>,
70}
71
72impl Display for DowngradeRegionReply {
73    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
74        write!(
75            f,
76            "(last_entry_id={:?}, exists={}, error={:?})",
77            self.last_entry_id, self.exists, self.error
78        )
79    }
80}
81
82#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
83pub struct SimpleReply {
84    pub result: bool,
85    pub error: Option<String>,
86}
87
88/// Reply for flush region operations with support for batch results.
89#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
90pub struct FlushRegionReply {
91    /// Results for each region that was attempted to be flushed.
92    /// For single region flushes, this will contain one result.
93    /// For batch flushes, this contains results for all attempted regions.
94    pub results: Vec<(RegionId, Result<(), String>)>,
95    /// Overall success: true if all regions were flushed successfully.
96    pub overall_success: bool,
97}
98
99impl FlushRegionReply {
100    /// Create a successful single region reply.
101    pub fn success_single(region_id: RegionId) -> Self {
102        Self {
103            results: vec![(region_id, Ok(()))],
104            overall_success: true,
105        }
106    }
107
108    /// Create a failed single region reply.
109    pub fn error_single(region_id: RegionId, error: String) -> Self {
110        Self {
111            results: vec![(region_id, Err(error))],
112            overall_success: false,
113        }
114    }
115
116    /// Create a batch reply from individual results.
117    pub fn from_results(results: Vec<(RegionId, Result<(), String>)>) -> Self {
118        let overall_success = results.iter().all(|(_, result)| result.is_ok());
119        Self {
120            results,
121            overall_success,
122        }
123    }
124
125    /// Convert to SimpleReply for backward compatibility.
126    pub fn to_simple_reply(&self) -> SimpleReply {
127        if self.overall_success {
128            SimpleReply {
129                result: true,
130                error: None,
131            }
132        } else {
133            let errors: Vec<String> = self
134                .results
135                .iter()
136                .filter_map(|(region_id, result)| {
137                    result
138                        .as_ref()
139                        .err()
140                        .map(|err| format!("{}: {}", region_id, err))
141                })
142                .collect();
143            SimpleReply {
144                result: false,
145                error: Some(errors.join("; ")),
146            }
147        }
148    }
149}
150
151impl Display for SimpleReply {
152    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
153        write!(f, "(result={}, error={:?})", self.result, self.error)
154    }
155}
156
157impl Display for FlushRegionReply {
158    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
159        let results_str = self
160            .results
161            .iter()
162            .map(|(region_id, result)| match result {
163                Ok(()) => format!("{}:OK", region_id),
164                Err(err) => format!("{}:ERR({})", region_id, err),
165            })
166            .collect::<Vec<_>>()
167            .join(", ");
168        write!(
169            f,
170            "(overall_success={}, results=[{}])",
171            self.overall_success, results_str
172        )
173    }
174}
175
176impl Display for OpenRegion {
177    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
178        write!(
179            f,
180            "OpenRegion(region_ident={}, region_storage_path={})",
181            self.region_ident, self.region_storage_path
182        )
183    }
184}
185
186#[serde_with::serde_as]
187#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
188pub struct OpenRegion {
189    pub region_ident: RegionIdent,
190    pub region_storage_path: String,
191    pub region_options: HashMap<String, String>,
192    #[serde(default)]
193    #[serde_as(as = "HashMap<serde_with::DisplayFromStr, _>")]
194    pub region_wal_options: HashMap<RegionNumber, String>,
195    #[serde(default)]
196    pub skip_wal_replay: bool,
197}
198
199impl OpenRegion {
200    pub fn new(
201        region_ident: RegionIdent,
202        path: &str,
203        region_options: HashMap<String, String>,
204        region_wal_options: HashMap<RegionNumber, String>,
205        skip_wal_replay: bool,
206    ) -> Self {
207        Self {
208            region_ident,
209            region_storage_path: path.to_string(),
210            region_options,
211            region_wal_options,
212            skip_wal_replay,
213        }
214    }
215}
216
217/// The instruction of downgrading leader region.
218#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
219pub struct DowngradeRegion {
220    /// The [RegionId].
221    pub region_id: RegionId,
222    /// The timeout of waiting for flush the region.
223    ///
224    /// `None` stands for don't flush before downgrading the region.
225    #[serde(default)]
226    pub flush_timeout: Option<Duration>,
227}
228
229impl Display for DowngradeRegion {
230    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
231        write!(
232            f,
233            "DowngradeRegion(region_id={}, flush_timeout={:?})",
234            self.region_id, self.flush_timeout,
235        )
236    }
237}
238
239/// Upgrades a follower region to leader region.
240#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
241pub struct UpgradeRegion {
242    /// The [RegionId].
243    pub region_id: RegionId,
244    /// The `last_entry_id` of old leader region.
245    pub last_entry_id: Option<u64>,
246    /// The `last_entry_id` of old leader metadata region (Only used for metric engine).
247    pub metadata_last_entry_id: Option<u64>,
248    /// The timeout of waiting for a wal replay.
249    ///
250    /// `None` stands for no wait,
251    /// it's helpful to verify whether the leader region is ready.
252    #[serde(with = "humantime_serde")]
253    pub replay_timeout: Duration,
254    /// The hint for replaying memtable.
255    #[serde(default)]
256    pub location_id: Option<u64>,
257    #[serde(default, skip_serializing_if = "Option::is_none")]
258    pub replay_entry_id: Option<u64>,
259    #[serde(default, skip_serializing_if = "Option::is_none")]
260    pub metadata_replay_entry_id: Option<u64>,
261}
262
263impl UpgradeRegion {
264    /// Sets the replay entry id.
265    pub fn with_replay_entry_id(mut self, replay_entry_id: Option<u64>) -> Self {
266        self.replay_entry_id = replay_entry_id;
267        self
268    }
269
270    /// Sets the metadata replay entry id.
271    pub fn with_metadata_replay_entry_id(mut self, metadata_replay_entry_id: Option<u64>) -> Self {
272        self.metadata_replay_entry_id = metadata_replay_entry_id;
273        self
274    }
275}
276
277#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
278/// The identifier of cache.
279pub enum CacheIdent {
280    FlowId(FlowId),
281    /// Indicate change of address of flownode.
282    FlowNodeAddressChange(u64),
283    FlowName(FlowName),
284    TableId(TableId),
285    TableName(TableName),
286    SchemaName(SchemaName),
287    CreateFlow(CreateFlow),
288    DropFlow(DropFlow),
289}
290
291#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
292pub struct CreateFlow {
293    /// The unique identifier for the flow.
294    pub flow_id: FlowId,
295    pub source_table_ids: Vec<TableId>,
296    /// Mapping of flow partition to peer information
297    pub partition_to_peer_mapping: Vec<(FlowPartitionId, Peer)>,
298}
299
300#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
301pub struct DropFlow {
302    pub flow_id: FlowId,
303    pub source_table_ids: Vec<TableId>,
304    /// Mapping of flow partition to flownode id
305    pub flow_part2node_id: Vec<(FlowPartitionId, FlownodeId)>,
306}
307
308/// Strategy for executing flush operations.
309#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
310pub enum FlushStrategy {
311    /// Synchronous operation that waits for completion and expects a reply
312    #[default]
313    Sync,
314    /// Asynchronous hint operation (fire-and-forget, no reply expected)
315    Async,
316}
317
318/// Error handling strategy for batch flush operations.
319#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
320pub enum FlushErrorStrategy {
321    /// Abort on first error (fail-fast)
322    #[default]
323    FailFast,
324    /// Attempt to flush all regions and collect all errors
325    TryAll,
326}
327
328/// Unified flush instruction supporting both single and batch operations
329/// with configurable execution strategies and error handling.
330#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
331pub struct FlushRegions {
332    /// List of region IDs to flush. Can contain a single region or multiple regions.
333    pub region_ids: Vec<RegionId>,
334    /// Execution strategy: Sync (expects reply) or Async (fire-and-forget hint).
335    #[serde(default)]
336    pub strategy: FlushStrategy,
337    /// Error handling strategy for batch operations (only applies when multiple regions and sync strategy).
338    #[serde(default)]
339    pub error_strategy: FlushErrorStrategy,
340}
341
342impl FlushRegions {
343    /// Create synchronous single-region flush
344    pub fn sync_single(region_id: RegionId) -> Self {
345        Self {
346            region_ids: vec![region_id],
347            strategy: FlushStrategy::Sync,
348            error_strategy: FlushErrorStrategy::FailFast,
349        }
350    }
351
352    /// Create asynchronous batch flush (fire-and-forget)
353    pub fn async_batch(region_ids: Vec<RegionId>) -> Self {
354        Self {
355            region_ids,
356            strategy: FlushStrategy::Async,
357            error_strategy: FlushErrorStrategy::TryAll,
358        }
359    }
360
361    /// Create synchronous batch flush with error strategy
362    pub fn sync_batch(region_ids: Vec<RegionId>, error_strategy: FlushErrorStrategy) -> Self {
363        Self {
364            region_ids,
365            strategy: FlushStrategy::Sync,
366            error_strategy,
367        }
368    }
369
370    /// Check if this is a single region flush.
371    pub fn is_single_region(&self) -> bool {
372        self.region_ids.len() == 1
373    }
374
375    /// Get the single region ID if this is a single region flush.
376    pub fn single_region_id(&self) -> Option<RegionId> {
377        if self.is_single_region() {
378            self.region_ids.first().copied()
379        } else {
380            None
381        }
382    }
383
384    /// Check if this is a hint (asynchronous) operation.
385    pub fn is_hint(&self) -> bool {
386        matches!(self.strategy, FlushStrategy::Async)
387    }
388
389    /// Check if this is a synchronous operation.
390    pub fn is_sync(&self) -> bool {
391        matches!(self.strategy, FlushStrategy::Sync)
392    }
393}
394
395impl From<RegionId> for FlushRegions {
396    fn from(region_id: RegionId) -> Self {
397        Self::sync_single(region_id)
398    }
399}
400
401#[derive(Debug, Deserialize)]
402#[serde(untagged)]
403enum SingleOrMultiple<T> {
404    Single(T),
405    Multiple(Vec<T>),
406}
407
408fn single_or_multiple_from<'de, D, T>(deserializer: D) -> Result<Vec<T>, D::Error>
409where
410    D: Deserializer<'de>,
411    T: Deserialize<'de>,
412{
413    let helper = SingleOrMultiple::<T>::deserialize(deserializer)?;
414    Ok(match helper {
415        SingleOrMultiple::Single(x) => vec![x],
416        SingleOrMultiple::Multiple(xs) => xs,
417    })
418}
419
420/// Instruction to get file references for specified regions.
421#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
422pub struct GetFileRefs {
423    /// List of region IDs to get file references for.
424    pub region_ids: Vec<RegionId>,
425}
426
427impl Display for GetFileRefs {
428    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
429        write!(f, "GetFileRefs(region_ids={:?})", self.region_ids)
430    }
431}
432
433/// Instruction to trigger garbage collection for a region.
434#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
435pub struct GcRegions {
436    /// The region ID to perform GC on.
437    pub regions: Vec<RegionId>,
438    /// The file references manifest containing temporary file references.
439    pub file_refs_manifest: FileRefsManifest,
440    /// Whether to perform a full file listing to find orphan files.
441    pub full_file_listing: bool,
442}
443
444impl Display for GcRegions {
445    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
446        write!(
447            f,
448            "GcRegion(regions={:?}, file_refs_count={}, full_file_listing={})",
449            self.regions,
450            self.file_refs_manifest.file_refs.len(),
451            self.full_file_listing
452        )
453    }
454}
455
456/// Reply for GetFileRefs instruction.
457#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
458pub struct GetFileRefsReply {
459    /// The file references manifest.
460    pub file_refs_manifest: FileRefsManifest,
461    /// Whether the operation was successful.
462    pub success: bool,
463    /// Error message if any.
464    pub error: Option<String>,
465}
466
467impl Display for GetFileRefsReply {
468    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
469        write!(
470            f,
471            "GetFileRefsReply(success={}, file_refs_count={}, error={:?})",
472            self.success,
473            self.file_refs_manifest.file_refs.len(),
474            self.error
475        )
476    }
477}
478
479/// Reply for GC instruction.
480#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
481pub struct GcRegionsReply {
482    pub result: Result<GcReport, String>,
483}
484
485impl Display for GcRegionsReply {
486    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
487        write!(
488            f,
489            "GcReply(result={})",
490            match &self.result {
491                Ok(report) => format!(
492                    "GcReport(deleted_files_count={}, need_retry_regions_count={})",
493                    report.deleted_files.len(),
494                    report.need_retry_regions.len()
495                ),
496                Err(err) => format!("Err({})", err),
497            }
498        )
499    }
500}
501
502#[derive(Debug, Clone, Serialize, Deserialize, Display, PartialEq)]
503pub enum Instruction {
504    /// Opens regions.
505    #[serde(deserialize_with = "single_or_multiple_from", alias = "OpenRegion")]
506    OpenRegions(Vec<OpenRegion>),
507    /// Closes regions.
508    #[serde(deserialize_with = "single_or_multiple_from", alias = "CloseRegion")]
509    CloseRegions(Vec<RegionIdent>),
510    /// Upgrades regions.
511    #[serde(deserialize_with = "single_or_multiple_from", alias = "UpgradeRegion")]
512    UpgradeRegions(Vec<UpgradeRegion>),
513    #[serde(
514        deserialize_with = "single_or_multiple_from",
515        alias = "DowngradeRegion"
516    )]
517    /// Downgrades regions.
518    DowngradeRegions(Vec<DowngradeRegion>),
519    /// Invalidates batch cache.
520    InvalidateCaches(Vec<CacheIdent>),
521    /// Flushes regions.
522    FlushRegions(FlushRegions),
523    /// Gets file references for regions.
524    GetFileRefs(GetFileRefs),
525    /// Triggers garbage collection for a region.
526    GcRegions(GcRegions),
527}
528
529impl Instruction {
530    /// Converts the instruction into a vector of [OpenRegion].
531    pub fn into_open_regions(self) -> Option<Vec<OpenRegion>> {
532        match self {
533            Self::OpenRegions(open_regions) => Some(open_regions),
534            _ => None,
535        }
536    }
537
538    /// Converts the instruction into a vector of [RegionIdent].
539    pub fn into_close_regions(self) -> Option<Vec<RegionIdent>> {
540        match self {
541            Self::CloseRegions(close_regions) => Some(close_regions),
542            _ => None,
543        }
544    }
545
546    /// Converts the instruction into a [FlushRegions].
547    pub fn into_flush_regions(self) -> Option<FlushRegions> {
548        match self {
549            Self::FlushRegions(flush_regions) => Some(flush_regions),
550            _ => None,
551        }
552    }
553
554    /// Converts the instruction into a [DowngradeRegion].
555    pub fn into_downgrade_regions(self) -> Option<Vec<DowngradeRegion>> {
556        match self {
557            Self::DowngradeRegions(downgrade_region) => Some(downgrade_region),
558            _ => None,
559        }
560    }
561
562    /// Converts the instruction into a [UpgradeRegion].
563    pub fn into_upgrade_regions(self) -> Option<Vec<UpgradeRegion>> {
564        match self {
565            Self::UpgradeRegions(upgrade_region) => Some(upgrade_region),
566            _ => None,
567        }
568    }
569
570    pub fn into_get_file_refs(self) -> Option<GetFileRefs> {
571        match self {
572            Self::GetFileRefs(get_file_refs) => Some(get_file_refs),
573            _ => None,
574        }
575    }
576
577    pub fn into_gc_regions(self) -> Option<GcRegions> {
578        match self {
579            Self::GcRegions(gc_regions) => Some(gc_regions),
580            _ => None,
581        }
582    }
583}
584
585/// The reply of [UpgradeRegion].
586#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
587pub struct UpgradeRegionReply {
588    /// The [RegionId].
589    /// For compatibility, it is defaulted to [RegionId::new(0, 0)].
590    #[serde(default)]
591    pub region_id: RegionId,
592    /// Returns true if `last_entry_id` has been replayed to the latest.
593    pub ready: bool,
594    /// Indicates whether the region exists.
595    pub exists: bool,
596    /// Returns error if any.
597    pub error: Option<String>,
598}
599
600impl Display for UpgradeRegionReply {
601    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
602        write!(
603            f,
604            "(ready={}, exists={}, error={:?})",
605            self.ready, self.exists, self.error
606        )
607    }
608}
609
610#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
611pub struct DowngradeRegionsReply {
612    pub replies: Vec<DowngradeRegionReply>,
613}
614
615impl DowngradeRegionsReply {
616    pub fn new(replies: Vec<DowngradeRegionReply>) -> Self {
617        Self { replies }
618    }
619
620    pub fn single(reply: DowngradeRegionReply) -> Self {
621        Self::new(vec![reply])
622    }
623}
624
625#[derive(Deserialize)]
626#[serde(untagged)]
627enum DowngradeRegionsCompat {
628    Single(DowngradeRegionReply),
629    Multiple(DowngradeRegionsReply),
630}
631
632fn downgrade_regions_compat_from<'de, D>(deserializer: D) -> Result<DowngradeRegionsReply, D::Error>
633where
634    D: Deserializer<'de>,
635{
636    let helper = DowngradeRegionsCompat::deserialize(deserializer)?;
637    Ok(match helper {
638        DowngradeRegionsCompat::Single(x) => DowngradeRegionsReply::new(vec![x]),
639        DowngradeRegionsCompat::Multiple(reply) => reply,
640    })
641}
642
643#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
644pub struct UpgradeRegionsReply {
645    pub replies: Vec<UpgradeRegionReply>,
646}
647
648impl UpgradeRegionsReply {
649    pub fn new(replies: Vec<UpgradeRegionReply>) -> Self {
650        Self { replies }
651    }
652
653    pub fn single(reply: UpgradeRegionReply) -> Self {
654        Self::new(vec![reply])
655    }
656}
657
658#[derive(Deserialize)]
659#[serde(untagged)]
660enum UpgradeRegionsCompat {
661    Single(UpgradeRegionReply),
662    Multiple(UpgradeRegionsReply),
663}
664
665fn upgrade_regions_compat_from<'de, D>(deserializer: D) -> Result<UpgradeRegionsReply, D::Error>
666where
667    D: Deserializer<'de>,
668{
669    let helper = UpgradeRegionsCompat::deserialize(deserializer)?;
670    Ok(match helper {
671        UpgradeRegionsCompat::Single(x) => UpgradeRegionsReply::new(vec![x]),
672        UpgradeRegionsCompat::Multiple(reply) => reply,
673    })
674}
675
676#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
677#[serde(tag = "type", rename_all = "snake_case")]
678pub enum InstructionReply {
679    #[serde(alias = "open_region")]
680    OpenRegions(SimpleReply),
681    #[serde(alias = "close_region")]
682    CloseRegions(SimpleReply),
683    #[serde(
684        deserialize_with = "upgrade_regions_compat_from",
685        alias = "upgrade_region"
686    )]
687    UpgradeRegions(UpgradeRegionsReply),
688    #[serde(
689        alias = "downgrade_region",
690        deserialize_with = "downgrade_regions_compat_from"
691    )]
692    DowngradeRegions(DowngradeRegionsReply),
693    FlushRegions(FlushRegionReply),
694    GetFileRefs(GetFileRefsReply),
695    GcRegions(GcRegionsReply),
696}
697
698impl Display for InstructionReply {
699    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
700        match self {
701            Self::OpenRegions(reply) => write!(f, "InstructionReply::OpenRegions({})", reply),
702            Self::CloseRegions(reply) => write!(f, "InstructionReply::CloseRegions({})", reply),
703            Self::UpgradeRegions(reply) => {
704                write!(f, "InstructionReply::UpgradeRegions({:?})", reply.replies)
705            }
706            Self::DowngradeRegions(reply) => {
707                write!(f, "InstructionReply::DowngradeRegions({:?})", reply.replies)
708            }
709            Self::FlushRegions(reply) => write!(f, "InstructionReply::FlushRegions({})", reply),
710            Self::GetFileRefs(reply) => write!(f, "InstructionReply::GetFileRefs({})", reply),
711            Self::GcRegions(reply) => write!(f, "InstructionReply::GcRegion({})", reply),
712        }
713    }
714}
715
716#[cfg(any(test, feature = "testing"))]
717impl InstructionReply {
718    pub fn expect_close_regions_reply(self) -> SimpleReply {
719        match self {
720            Self::CloseRegions(reply) => reply,
721            _ => panic!("Expected CloseRegions reply"),
722        }
723    }
724
725    pub fn expect_open_regions_reply(self) -> SimpleReply {
726        match self {
727            Self::OpenRegions(reply) => reply,
728            _ => panic!("Expected OpenRegions reply"),
729        }
730    }
731
732    pub fn expect_upgrade_regions_reply(self) -> Vec<UpgradeRegionReply> {
733        match self {
734            Self::UpgradeRegions(reply) => reply.replies,
735            _ => panic!("Expected UpgradeRegion reply"),
736        }
737    }
738
739    pub fn expect_downgrade_regions_reply(self) -> Vec<DowngradeRegionReply> {
740        match self {
741            Self::DowngradeRegions(reply) => reply.replies,
742            _ => panic!("Expected DowngradeRegion reply"),
743        }
744    }
745
746    pub fn expect_flush_regions_reply(self) -> FlushRegionReply {
747        match self {
748            Self::FlushRegions(reply) => reply,
749            _ => panic!("Expected FlushRegions reply"),
750        }
751    }
752}
753
754#[cfg(test)]
755mod tests {
756    use std::collections::HashSet;
757
758    use store_api::storage::FileId;
759
760    use super::*;
761
762    #[test]
763    fn test_serialize_instruction() {
764        let open_region = Instruction::OpenRegions(vec![OpenRegion::new(
765            RegionIdent {
766                datanode_id: 2,
767                table_id: 1024,
768                region_number: 1,
769                engine: "mito2".to_string(),
770            },
771            "test/foo",
772            HashMap::new(),
773            HashMap::new(),
774            false,
775        )]);
776
777        let serialized = serde_json::to_string(&open_region).unwrap();
778        assert_eq!(
779            r#"{"OpenRegions":[{"region_ident":{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"},"region_storage_path":"test/foo","region_options":{},"region_wal_options":{},"skip_wal_replay":false}]}"#,
780            serialized
781        );
782
783        let close_region = Instruction::CloseRegions(vec![RegionIdent {
784            datanode_id: 2,
785            table_id: 1024,
786            region_number: 1,
787            engine: "mito2".to_string(),
788        }]);
789
790        let serialized = serde_json::to_string(&close_region).unwrap();
791        assert_eq!(
792            r#"{"CloseRegions":[{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"}]}"#,
793            serialized
794        );
795
796        let upgrade_region = Instruction::UpgradeRegions(vec![UpgradeRegion {
797            region_id: RegionId::new(1024, 1),
798            last_entry_id: None,
799            metadata_last_entry_id: None,
800            replay_timeout: Duration::from_millis(1000),
801            location_id: None,
802            replay_entry_id: None,
803            metadata_replay_entry_id: None,
804        }]);
805
806        let serialized = serde_json::to_string(&upgrade_region).unwrap();
807        assert_eq!(
808            r#"{"UpgradeRegions":[{"region_id":4398046511105,"last_entry_id":null,"metadata_last_entry_id":null,"replay_timeout":"1s","location_id":null}]}"#,
809            serialized
810        );
811    }
812
813    #[test]
814    fn test_serialize_instruction_reply() {
815        let downgrade_region_reply = InstructionReply::DowngradeRegions(
816            DowngradeRegionsReply::single(DowngradeRegionReply {
817                region_id: RegionId::new(1024, 1),
818                last_entry_id: None,
819                metadata_last_entry_id: None,
820                exists: true,
821                error: None,
822            }),
823        );
824
825        let serialized = serde_json::to_string(&downgrade_region_reply).unwrap();
826        assert_eq!(
827            r#"{"type":"downgrade_regions","replies":[{"region_id":4398046511105,"last_entry_id":null,"metadata_last_entry_id":null,"exists":true,"error":null}]}"#,
828            serialized
829        );
830
831        let upgrade_region_reply =
832            InstructionReply::UpgradeRegions(UpgradeRegionsReply::single(UpgradeRegionReply {
833                region_id: RegionId::new(1024, 1),
834                ready: true,
835                exists: true,
836                error: None,
837            }));
838        let serialized = serde_json::to_string(&upgrade_region_reply).unwrap();
839        assert_eq!(
840            r#"{"type":"upgrade_regions","replies":[{"region_id":4398046511105,"ready":true,"exists":true,"error":null}]}"#,
841            serialized
842        );
843    }
844
845    #[test]
846    fn test_deserialize_instruction() {
847        // legacy open region instruction
848        let open_region_instruction = r#"{"OpenRegion":{"region_ident":{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"},"region_storage_path":"test/foo","region_options":{},"region_wal_options":{},"skip_wal_replay":false}}"#;
849        let open_region_instruction: Instruction =
850            serde_json::from_str(open_region_instruction).unwrap();
851        let open_region = Instruction::OpenRegions(vec![OpenRegion::new(
852            RegionIdent {
853                datanode_id: 2,
854                table_id: 1024,
855                region_number: 1,
856                engine: "mito2".to_string(),
857            },
858            "test/foo",
859            HashMap::new(),
860            HashMap::new(),
861            false,
862        )]);
863        assert_eq!(open_region_instruction, open_region);
864
865        // legacy close region instruction
866        let close_region_instruction = r#"{"CloseRegion":{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"}}"#;
867        let close_region_instruction: Instruction =
868            serde_json::from_str(close_region_instruction).unwrap();
869        let close_region = Instruction::CloseRegions(vec![RegionIdent {
870            datanode_id: 2,
871            table_id: 1024,
872            region_number: 1,
873            engine: "mito2".to_string(),
874        }]);
875        assert_eq!(close_region_instruction, close_region);
876
877        // legacy downgrade region instruction
878        let downgrade_region_instruction = r#"{"DowngradeRegions":{"region_id":4398046511105,"flush_timeout":{"secs":1,"nanos":0}}}"#;
879        let downgrade_region_instruction: Instruction =
880            serde_json::from_str(downgrade_region_instruction).unwrap();
881        let downgrade_region = Instruction::DowngradeRegions(vec![DowngradeRegion {
882            region_id: RegionId::new(1024, 1),
883            flush_timeout: Some(Duration::from_millis(1000)),
884        }]);
885        assert_eq!(downgrade_region_instruction, downgrade_region);
886
887        // legacy upgrade region instruction
888        let upgrade_region_instruction = r#"{"UpgradeRegion":{"region_id":4398046511105,"last_entry_id":null,"metadata_last_entry_id":null,"replay_timeout":"1s","location_id":null,"replay_entry_id":null,"metadata_replay_entry_id":null}}"#;
889        let upgrade_region_instruction: Instruction =
890            serde_json::from_str(upgrade_region_instruction).unwrap();
891        let upgrade_region = Instruction::UpgradeRegions(vec![UpgradeRegion {
892            region_id: RegionId::new(1024, 1),
893            last_entry_id: None,
894            metadata_last_entry_id: None,
895            replay_timeout: Duration::from_millis(1000),
896            location_id: None,
897            replay_entry_id: None,
898            metadata_replay_entry_id: None,
899        }]);
900        assert_eq!(upgrade_region_instruction, upgrade_region);
901    }
902
903    #[test]
904    fn test_deserialize_instruction_reply() {
905        // legacy close region reply
906        let close_region_instruction_reply =
907            r#"{"result":true,"error":null,"type":"close_region"}"#;
908        let close_region_instruction_reply: InstructionReply =
909            serde_json::from_str(close_region_instruction_reply).unwrap();
910        let close_region_reply = InstructionReply::CloseRegions(SimpleReply {
911            result: true,
912            error: None,
913        });
914        assert_eq!(close_region_instruction_reply, close_region_reply);
915
916        // legacy open region reply
917        let open_region_instruction_reply = r#"{"result":true,"error":null,"type":"open_region"}"#;
918        let open_region_instruction_reply: InstructionReply =
919            serde_json::from_str(open_region_instruction_reply).unwrap();
920        let open_region_reply = InstructionReply::OpenRegions(SimpleReply {
921            result: true,
922            error: None,
923        });
924        assert_eq!(open_region_instruction_reply, open_region_reply);
925
926        // legacy downgrade region reply
927        let downgrade_region_instruction_reply = r#"{"region_id":4398046511105,"last_entry_id":null,"metadata_last_entry_id":null,"exists":true,"error":null,"type":"downgrade_region"}"#;
928        let downgrade_region_instruction_reply: InstructionReply =
929            serde_json::from_str(downgrade_region_instruction_reply).unwrap();
930        let downgrade_region_reply = InstructionReply::DowngradeRegions(
931            DowngradeRegionsReply::single(DowngradeRegionReply {
932                region_id: RegionId::new(1024, 1),
933                last_entry_id: None,
934                metadata_last_entry_id: None,
935                exists: true,
936                error: None,
937            }),
938        );
939        assert_eq!(downgrade_region_instruction_reply, downgrade_region_reply);
940
941        // legacy upgrade region reply
942        let upgrade_region_instruction_reply = r#"{"region_id":4398046511105,"ready":true,"exists":true,"error":null,"type":"upgrade_region"}"#;
943        let upgrade_region_instruction_reply: InstructionReply =
944            serde_json::from_str(upgrade_region_instruction_reply).unwrap();
945        let upgrade_region_reply =
946            InstructionReply::UpgradeRegions(UpgradeRegionsReply::single(UpgradeRegionReply {
947                region_id: RegionId::new(1024, 1),
948                ready: true,
949                exists: true,
950                error: None,
951            }));
952        assert_eq!(upgrade_region_instruction_reply, upgrade_region_reply);
953    }
954
955    #[derive(Debug, Clone, Serialize, Deserialize)]
956    struct LegacyOpenRegion {
957        region_ident: RegionIdent,
958        region_storage_path: String,
959        region_options: HashMap<String, String>,
960    }
961
962    #[test]
963    fn test_compatible_serialize_open_region() {
964        let region_ident = RegionIdent {
965            datanode_id: 2,
966            table_id: 1024,
967            region_number: 1,
968            engine: "mito2".to_string(),
969        };
970        let region_storage_path = "test/foo".to_string();
971        let region_options = HashMap::from([
972            ("a".to_string(), "aa".to_string()),
973            ("b".to_string(), "bb".to_string()),
974        ]);
975
976        // Serialize a legacy OpenRegion.
977        let legacy_open_region = LegacyOpenRegion {
978            region_ident: region_ident.clone(),
979            region_storage_path: region_storage_path.clone(),
980            region_options: region_options.clone(),
981        };
982        let serialized = serde_json::to_string(&legacy_open_region).unwrap();
983
984        // Deserialize to OpenRegion.
985        let deserialized = serde_json::from_str(&serialized).unwrap();
986        let expected = OpenRegion {
987            region_ident,
988            region_storage_path,
989            region_options,
990            region_wal_options: HashMap::new(),
991            skip_wal_replay: false,
992        };
993        assert_eq!(expected, deserialized);
994    }
995
996    #[test]
997    fn test_flush_regions_creation() {
998        let region_id = RegionId::new(1024, 1);
999
1000        // Single region sync flush
1001        let single_sync = FlushRegions::sync_single(region_id);
1002        assert_eq!(single_sync.region_ids, vec![region_id]);
1003        assert_eq!(single_sync.strategy, FlushStrategy::Sync);
1004        assert!(!single_sync.is_hint());
1005        assert!(single_sync.is_sync());
1006        assert_eq!(single_sync.error_strategy, FlushErrorStrategy::FailFast);
1007        assert!(single_sync.is_single_region());
1008        assert_eq!(single_sync.single_region_id(), Some(region_id));
1009
1010        // Batch async flush (hint)
1011        let region_ids = vec![RegionId::new(1024, 1), RegionId::new(1024, 2)];
1012        let batch_async = FlushRegions::async_batch(region_ids.clone());
1013        assert_eq!(batch_async.region_ids, region_ids);
1014        assert_eq!(batch_async.strategy, FlushStrategy::Async);
1015        assert!(batch_async.is_hint());
1016        assert!(!batch_async.is_sync());
1017        assert_eq!(batch_async.error_strategy, FlushErrorStrategy::TryAll);
1018        assert!(!batch_async.is_single_region());
1019        assert_eq!(batch_async.single_region_id(), None);
1020
1021        // Batch sync flush
1022        let batch_sync = FlushRegions::sync_batch(region_ids.clone(), FlushErrorStrategy::FailFast);
1023        assert_eq!(batch_sync.region_ids, region_ids);
1024        assert_eq!(batch_sync.strategy, FlushStrategy::Sync);
1025        assert!(!batch_sync.is_hint());
1026        assert!(batch_sync.is_sync());
1027        assert_eq!(batch_sync.error_strategy, FlushErrorStrategy::FailFast);
1028    }
1029
1030    #[test]
1031    fn test_flush_regions_conversion() {
1032        let region_id = RegionId::new(1024, 1);
1033
1034        let from_region_id: FlushRegions = region_id.into();
1035        assert_eq!(from_region_id.region_ids, vec![region_id]);
1036        assert_eq!(from_region_id.strategy, FlushStrategy::Sync);
1037        assert!(!from_region_id.is_hint());
1038        assert!(from_region_id.is_sync());
1039
1040        // Test default construction
1041        let flush_regions = FlushRegions {
1042            region_ids: vec![region_id],
1043            strategy: FlushStrategy::Async,
1044            error_strategy: FlushErrorStrategy::TryAll,
1045        };
1046        assert_eq!(flush_regions.region_ids, vec![region_id]);
1047        assert_eq!(flush_regions.strategy, FlushStrategy::Async);
1048        assert!(flush_regions.is_hint());
1049        assert!(!flush_regions.is_sync());
1050    }
1051
1052    #[test]
1053    fn test_flush_region_reply() {
1054        let region_id = RegionId::new(1024, 1);
1055
1056        // Successful single region reply
1057        let success_reply = FlushRegionReply::success_single(region_id);
1058        assert!(success_reply.overall_success);
1059        assert_eq!(success_reply.results.len(), 1);
1060        assert_eq!(success_reply.results[0].0, region_id);
1061        assert!(success_reply.results[0].1.is_ok());
1062
1063        // Failed single region reply
1064        let error_reply = FlushRegionReply::error_single(region_id, "test error".to_string());
1065        assert!(!error_reply.overall_success);
1066        assert_eq!(error_reply.results.len(), 1);
1067        assert_eq!(error_reply.results[0].0, region_id);
1068        assert!(error_reply.results[0].1.is_err());
1069
1070        // Batch reply
1071        let region_id2 = RegionId::new(1024, 2);
1072        let results = vec![
1073            (region_id, Ok(())),
1074            (region_id2, Err("flush failed".to_string())),
1075        ];
1076        let batch_reply = FlushRegionReply::from_results(results);
1077        assert!(!batch_reply.overall_success);
1078        assert_eq!(batch_reply.results.len(), 2);
1079
1080        // Conversion to SimpleReply
1081        let simple_reply = batch_reply.to_simple_reply();
1082        assert!(!simple_reply.result);
1083        assert!(simple_reply.error.is_some());
1084        assert!(simple_reply.error.unwrap().contains("flush failed"));
1085    }
1086
1087    #[test]
1088    fn test_serialize_flush_regions_instruction() {
1089        let region_id = RegionId::new(1024, 1);
1090        let flush_regions = FlushRegions::sync_single(region_id);
1091        let instruction = Instruction::FlushRegions(flush_regions.clone());
1092
1093        let serialized = serde_json::to_string(&instruction).unwrap();
1094        let deserialized: Instruction = serde_json::from_str(&serialized).unwrap();
1095
1096        match deserialized {
1097            Instruction::FlushRegions(fr) => {
1098                assert_eq!(fr.region_ids, vec![region_id]);
1099                assert_eq!(fr.strategy, FlushStrategy::Sync);
1100                assert_eq!(fr.error_strategy, FlushErrorStrategy::FailFast);
1101            }
1102            _ => panic!("Expected FlushRegions instruction"),
1103        }
1104    }
1105
1106    #[test]
1107    fn test_serialize_flush_regions_batch_instruction() {
1108        let region_ids = vec![RegionId::new(1024, 1), RegionId::new(1024, 2)];
1109        let flush_regions =
1110            FlushRegions::sync_batch(region_ids.clone(), FlushErrorStrategy::TryAll);
1111        let instruction = Instruction::FlushRegions(flush_regions);
1112
1113        let serialized = serde_json::to_string(&instruction).unwrap();
1114        let deserialized: Instruction = serde_json::from_str(&serialized).unwrap();
1115
1116        match deserialized {
1117            Instruction::FlushRegions(fr) => {
1118                assert_eq!(fr.region_ids, region_ids);
1119                assert_eq!(fr.strategy, FlushStrategy::Sync);
1120                assert!(!fr.is_hint());
1121                assert!(fr.is_sync());
1122                assert_eq!(fr.error_strategy, FlushErrorStrategy::TryAll);
1123            }
1124            _ => panic!("Expected FlushRegions instruction"),
1125        }
1126    }
1127
1128    #[test]
1129    fn test_serialize_get_file_refs_instruction_reply() {
1130        let mut manifest = FileRefsManifest::default();
1131        let r0 = RegionId::new(1024, 1);
1132        let r1 = RegionId::new(1024, 2);
1133        manifest
1134            .file_refs
1135            .insert(r0, HashSet::from([FileId::random()]));
1136        manifest
1137            .file_refs
1138            .insert(r1, HashSet::from([FileId::random()]));
1139        manifest.manifest_version.insert(r0, 10);
1140        manifest.manifest_version.insert(r1, 20);
1141
1142        let instruction_reply = InstructionReply::GetFileRefs(GetFileRefsReply {
1143            file_refs_manifest: manifest,
1144            success: true,
1145            error: None,
1146        });
1147
1148        let serialized = serde_json::to_string(&instruction_reply).unwrap();
1149        let deserialized = serde_json::from_str(&serialized).unwrap();
1150
1151        assert_eq!(instruction_reply, deserialized);
1152    }
1153}