common_meta/
instruction.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::fmt::{Display, Formatter};
17use std::time::Duration;
18
19use serde::{Deserialize, Deserializer, Serialize};
20use store_api::storage::{FileRefsManifest, GcReport, RegionId, RegionNumber};
21use strum::Display;
22use table::metadata::TableId;
23use table::table_name::TableName;
24
25use crate::flow_name::FlowName;
26use crate::key::schema_name::SchemaName;
27use crate::key::{FlowId, FlowPartitionId};
28use crate::peer::Peer;
29use crate::{DatanodeId, FlownodeId};
30
31#[derive(Eq, Hash, PartialEq, Clone, Debug, Serialize, Deserialize)]
32pub struct RegionIdent {
33    pub datanode_id: DatanodeId,
34    pub table_id: TableId,
35    pub region_number: RegionNumber,
36    pub engine: String,
37}
38
39impl RegionIdent {
40    pub fn get_region_id(&self) -> RegionId {
41        RegionId::new(self.table_id, self.region_number)
42    }
43}
44
45impl Display for RegionIdent {
46    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
47        write!(
48            f,
49            "RegionIdent(datanode_id='{}', table_id={}, region_number={}, engine = {})",
50            self.datanode_id, self.table_id, self.region_number, self.engine
51        )
52    }
53}
54
55/// The result of downgrade leader region.
56#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
57pub struct DowngradeRegionReply {
58    /// The [RegionId].
59    /// For compatibility, it is defaulted to [RegionId::new(0, 0)].
60    #[serde(default)]
61    pub region_id: RegionId,
62    /// Returns the `last_entry_id` if available.
63    pub last_entry_id: Option<u64>,
64    /// Returns the `metadata_last_entry_id` if available (Only available for metric engine).
65    pub metadata_last_entry_id: Option<u64>,
66    /// Indicates whether the region exists.
67    pub exists: bool,
68    /// Return error if any during the operation.
69    pub error: Option<String>,
70}
71
72impl Display for DowngradeRegionReply {
73    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
74        write!(
75            f,
76            "(last_entry_id={:?}, exists={}, error={:?})",
77            self.last_entry_id, self.exists, self.error
78        )
79    }
80}
81
82#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
83pub struct SimpleReply {
84    pub result: bool,
85    pub error: Option<String>,
86}
87
88/// Reply for flush region operations with support for batch results.
89#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
90pub struct FlushRegionReply {
91    /// Results for each region that was attempted to be flushed.
92    /// For single region flushes, this will contain one result.
93    /// For batch flushes, this contains results for all attempted regions.
94    pub results: Vec<(RegionId, Result<(), String>)>,
95    /// Overall success: true if all regions were flushed successfully.
96    pub overall_success: bool,
97}
98
99impl FlushRegionReply {
100    /// Create a successful single region reply.
101    pub fn success_single(region_id: RegionId) -> Self {
102        Self {
103            results: vec![(region_id, Ok(()))],
104            overall_success: true,
105        }
106    }
107
108    /// Create a failed single region reply.
109    pub fn error_single(region_id: RegionId, error: String) -> Self {
110        Self {
111            results: vec![(region_id, Err(error))],
112            overall_success: false,
113        }
114    }
115
116    /// Create a batch reply from individual results.
117    pub fn from_results(results: Vec<(RegionId, Result<(), String>)>) -> Self {
118        let overall_success = results.iter().all(|(_, result)| result.is_ok());
119        Self {
120            results,
121            overall_success,
122        }
123    }
124
125    /// Convert to SimpleReply for backward compatibility.
126    pub fn to_simple_reply(&self) -> SimpleReply {
127        if self.overall_success {
128            SimpleReply {
129                result: true,
130                error: None,
131            }
132        } else {
133            let errors: Vec<String> = self
134                .results
135                .iter()
136                .filter_map(|(region_id, result)| {
137                    result
138                        .as_ref()
139                        .err()
140                        .map(|err| format!("{}: {}", region_id, err))
141                })
142                .collect();
143            SimpleReply {
144                result: false,
145                error: Some(errors.join("; ")),
146            }
147        }
148    }
149}
150
151impl Display for SimpleReply {
152    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
153        write!(f, "(result={}, error={:?})", self.result, self.error)
154    }
155}
156
157impl Display for FlushRegionReply {
158    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
159        let results_str = self
160            .results
161            .iter()
162            .map(|(region_id, result)| match result {
163                Ok(()) => format!("{}:OK", region_id),
164                Err(err) => format!("{}:ERR({})", region_id, err),
165            })
166            .collect::<Vec<_>>()
167            .join(", ");
168        write!(
169            f,
170            "(overall_success={}, results=[{}])",
171            self.overall_success, results_str
172        )
173    }
174}
175
176impl Display for OpenRegion {
177    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
178        write!(
179            f,
180            "OpenRegion(region_ident={}, region_storage_path={})",
181            self.region_ident, self.region_storage_path
182        )
183    }
184}
185
186#[serde_with::serde_as]
187#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
188pub struct OpenRegion {
189    pub region_ident: RegionIdent,
190    pub region_storage_path: String,
191    pub region_options: HashMap<String, String>,
192    #[serde(default)]
193    #[serde_as(as = "HashMap<serde_with::DisplayFromStr, _>")]
194    pub region_wal_options: HashMap<RegionNumber, String>,
195    #[serde(default)]
196    pub skip_wal_replay: bool,
197}
198
199impl OpenRegion {
200    pub fn new(
201        region_ident: RegionIdent,
202        path: &str,
203        region_options: HashMap<String, String>,
204        region_wal_options: HashMap<RegionNumber, String>,
205        skip_wal_replay: bool,
206    ) -> Self {
207        Self {
208            region_ident,
209            region_storage_path: path.to_string(),
210            region_options,
211            region_wal_options,
212            skip_wal_replay,
213        }
214    }
215}
216
217/// The instruction of downgrading leader region.
218#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
219pub struct DowngradeRegion {
220    /// The [RegionId].
221    pub region_id: RegionId,
222    /// The timeout of waiting for flush the region.
223    ///
224    /// `None` stands for don't flush before downgrading the region.
225    #[serde(default)]
226    pub flush_timeout: Option<Duration>,
227}
228
229impl Display for DowngradeRegion {
230    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
231        write!(
232            f,
233            "DowngradeRegion(region_id={}, flush_timeout={:?})",
234            self.region_id, self.flush_timeout,
235        )
236    }
237}
238
239/// Upgrades a follower region to leader region.
240#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
241pub struct UpgradeRegion {
242    /// The [RegionId].
243    pub region_id: RegionId,
244    /// The `last_entry_id` of old leader region.
245    pub last_entry_id: Option<u64>,
246    /// The `last_entry_id` of old leader metadata region (Only used for metric engine).
247    pub metadata_last_entry_id: Option<u64>,
248    /// The timeout of waiting for a wal replay.
249    ///
250    /// `None` stands for no wait,
251    /// it's helpful to verify whether the leader region is ready.
252    #[serde(with = "humantime_serde")]
253    pub replay_timeout: Duration,
254    /// The hint for replaying memtable.
255    #[serde(default)]
256    pub location_id: Option<u64>,
257    #[serde(default, skip_serializing_if = "Option::is_none")]
258    pub replay_entry_id: Option<u64>,
259    #[serde(default, skip_serializing_if = "Option::is_none")]
260    pub metadata_replay_entry_id: Option<u64>,
261}
262
263impl UpgradeRegion {
264    /// Sets the replay entry id.
265    pub fn with_replay_entry_id(mut self, replay_entry_id: Option<u64>) -> Self {
266        self.replay_entry_id = replay_entry_id;
267        self
268    }
269
270    /// Sets the metadata replay entry id.
271    pub fn with_metadata_replay_entry_id(mut self, metadata_replay_entry_id: Option<u64>) -> Self {
272        self.metadata_replay_entry_id = metadata_replay_entry_id;
273        self
274    }
275}
276
277#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
278/// The identifier of cache.
279pub enum CacheIdent {
280    FlowId(FlowId),
281    /// Indicate change of address of flownode.
282    FlowNodeAddressChange(u64),
283    FlowName(FlowName),
284    TableId(TableId),
285    TableName(TableName),
286    SchemaName(SchemaName),
287    CreateFlow(CreateFlow),
288    DropFlow(DropFlow),
289}
290
291#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
292pub struct CreateFlow {
293    /// The unique identifier for the flow.
294    pub flow_id: FlowId,
295    pub source_table_ids: Vec<TableId>,
296    /// Mapping of flow partition to peer information
297    pub partition_to_peer_mapping: Vec<(FlowPartitionId, Peer)>,
298}
299
300#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
301pub struct DropFlow {
302    pub flow_id: FlowId,
303    pub source_table_ids: Vec<TableId>,
304    /// Mapping of flow partition to flownode id
305    pub flow_part2node_id: Vec<(FlowPartitionId, FlownodeId)>,
306}
307
308/// Strategy for executing flush operations.
309#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
310pub enum FlushStrategy {
311    /// Synchronous operation that waits for completion and expects a reply
312    #[default]
313    Sync,
314    /// Asynchronous hint operation (fire-and-forget, no reply expected)
315    Async,
316}
317
318/// Error handling strategy for batch flush operations.
319#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
320pub enum FlushErrorStrategy {
321    /// Abort on first error (fail-fast)
322    #[default]
323    FailFast,
324    /// Attempt to flush all regions and collect all errors
325    TryAll,
326}
327
328/// Unified flush instruction supporting both single and batch operations
329/// with configurable execution strategies and error handling.
330#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
331pub struct FlushRegions {
332    /// List of region IDs to flush. Can contain a single region or multiple regions.
333    pub region_ids: Vec<RegionId>,
334    /// Execution strategy: Sync (expects reply) or Async (fire-and-forget hint).
335    #[serde(default)]
336    pub strategy: FlushStrategy,
337    /// Error handling strategy for batch operations (only applies when multiple regions and sync strategy).
338    #[serde(default)]
339    pub error_strategy: FlushErrorStrategy,
340}
341
342impl Display for FlushRegions {
343    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
344        write!(
345            f,
346            "FlushRegions(region_ids={:?}, strategy={:?}, error_strategy={:?})",
347            self.region_ids, self.strategy, self.error_strategy
348        )
349    }
350}
351
352impl FlushRegions {
353    /// Create synchronous single-region flush
354    pub fn sync_single(region_id: RegionId) -> Self {
355        Self {
356            region_ids: vec![region_id],
357            strategy: FlushStrategy::Sync,
358            error_strategy: FlushErrorStrategy::FailFast,
359        }
360    }
361
362    /// Create asynchronous batch flush (fire-and-forget)
363    pub fn async_batch(region_ids: Vec<RegionId>) -> Self {
364        Self {
365            region_ids,
366            strategy: FlushStrategy::Async,
367            error_strategy: FlushErrorStrategy::TryAll,
368        }
369    }
370
371    /// Create synchronous batch flush with error strategy
372    pub fn sync_batch(region_ids: Vec<RegionId>, error_strategy: FlushErrorStrategy) -> Self {
373        Self {
374            region_ids,
375            strategy: FlushStrategy::Sync,
376            error_strategy,
377        }
378    }
379
380    /// Check if this is a single region flush.
381    pub fn is_single_region(&self) -> bool {
382        self.region_ids.len() == 1
383    }
384
385    /// Get the single region ID if this is a single region flush.
386    pub fn single_region_id(&self) -> Option<RegionId> {
387        if self.is_single_region() {
388            self.region_ids.first().copied()
389        } else {
390            None
391        }
392    }
393
394    /// Check if this is a hint (asynchronous) operation.
395    pub fn is_hint(&self) -> bool {
396        matches!(self.strategy, FlushStrategy::Async)
397    }
398
399    /// Check if this is a synchronous operation.
400    pub fn is_sync(&self) -> bool {
401        matches!(self.strategy, FlushStrategy::Sync)
402    }
403}
404
405impl From<RegionId> for FlushRegions {
406    fn from(region_id: RegionId) -> Self {
407        Self::sync_single(region_id)
408    }
409}
410
411#[derive(Debug, Deserialize)]
412#[serde(untagged)]
413enum SingleOrMultiple<T> {
414    Single(T),
415    Multiple(Vec<T>),
416}
417
418fn single_or_multiple_from<'de, D, T>(deserializer: D) -> Result<Vec<T>, D::Error>
419where
420    D: Deserializer<'de>,
421    T: Deserialize<'de>,
422{
423    let helper = SingleOrMultiple::<T>::deserialize(deserializer)?;
424    Ok(match helper {
425        SingleOrMultiple::Single(x) => vec![x],
426        SingleOrMultiple::Multiple(xs) => xs,
427    })
428}
429
430/// Instruction to get file references for specified regions.
431#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
432pub struct GetFileRefs {
433    /// List of region IDs to get file references from active FileHandles (in-memory).
434    pub query_regions: Vec<RegionId>,
435    /// Mapping from the source region ID (where to read the manifest) to
436    /// the target region IDs (whose file references to look for).
437    /// Key: The region ID of the manifest.
438    /// Value: The list of region IDs to find references for in that manifest.
439    pub related_regions: HashMap<RegionId, Vec<RegionId>>,
440}
441
442impl Display for GetFileRefs {
443    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
444        write!(f, "GetFileRefs(region_ids={:?})", self.query_regions)
445    }
446}
447
448/// Instruction to trigger garbage collection for a region.
449#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
450pub struct GcRegions {
451    /// The region ID to perform GC on, only regions that are currently on the given datanode can be garbage collected, regions not on the datanode will report errors.
452    pub regions: Vec<RegionId>,
453    /// The file references manifest containing temporary file references.
454    pub file_refs_manifest: FileRefsManifest,
455    /// Whether to perform a full file listing to find orphan files.
456    pub full_file_listing: bool,
457}
458
459impl Display for GcRegions {
460    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
461        write!(
462            f,
463            "GcRegion(regions={:?}, file_refs_count={}, full_file_listing={})",
464            self.regions,
465            self.file_refs_manifest.file_refs.len(),
466            self.full_file_listing
467        )
468    }
469}
470
471/// Reply for GetFileRefs instruction.
472#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
473pub struct GetFileRefsReply {
474    /// The file references manifest.
475    pub file_refs_manifest: FileRefsManifest,
476    /// Whether the operation was successful.
477    pub success: bool,
478    /// Error message if any.
479    pub error: Option<String>,
480}
481
482impl Display for GetFileRefsReply {
483    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
484        write!(
485            f,
486            "GetFileRefsReply(success={}, file_refs_count={}, error={:?})",
487            self.success,
488            self.file_refs_manifest.file_refs.len(),
489            self.error
490        )
491    }
492}
493
494/// Reply for GC instruction.
495#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
496pub struct GcRegionsReply {
497    pub result: Result<GcReport, String>,
498}
499
500impl Display for GcRegionsReply {
501    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
502        write!(
503            f,
504            "GcReply(result={})",
505            match &self.result {
506                Ok(report) => format!(
507                    "GcReport(deleted_files_count={}, need_retry_regions_count={})",
508                    report.deleted_files.len(),
509                    report.need_retry_regions.len()
510                ),
511                Err(err) => format!("Err({})", err),
512            }
513        )
514    }
515}
516
517#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
518pub struct EnterStagingRegion {
519    pub region_id: RegionId,
520    pub partition_expr: String,
521}
522
523impl Display for EnterStagingRegion {
524    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
525        write!(
526            f,
527            "EnterStagingRegion(region_id={}, partition_expr={})",
528            self.region_id, self.partition_expr
529        )
530    }
531}
532
533#[derive(Debug, Clone, Serialize, Deserialize, Display, PartialEq)]
534pub enum Instruction {
535    /// Opens regions.
536    #[serde(deserialize_with = "single_or_multiple_from", alias = "OpenRegion")]
537    OpenRegions(Vec<OpenRegion>),
538    /// Closes regions.
539    #[serde(deserialize_with = "single_or_multiple_from", alias = "CloseRegion")]
540    CloseRegions(Vec<RegionIdent>),
541    /// Upgrades regions.
542    #[serde(deserialize_with = "single_or_multiple_from", alias = "UpgradeRegion")]
543    UpgradeRegions(Vec<UpgradeRegion>),
544    #[serde(
545        deserialize_with = "single_or_multiple_from",
546        alias = "DowngradeRegion"
547    )]
548    /// Downgrades regions.
549    DowngradeRegions(Vec<DowngradeRegion>),
550    /// Invalidates batch cache.
551    InvalidateCaches(Vec<CacheIdent>),
552    /// Flushes regions.
553    FlushRegions(FlushRegions),
554    /// Gets file references for regions.
555    GetFileRefs(GetFileRefs),
556    /// Triggers garbage collection for a region.
557    GcRegions(GcRegions),
558    /// Temporary suspend serving reads or writes
559    Suspend,
560    /// Makes regions enter staging state.
561    EnterStagingRegions(Vec<EnterStagingRegion>),
562}
563
564impl Instruction {
565    /// Converts the instruction into a vector of [OpenRegion].
566    pub fn into_open_regions(self) -> Option<Vec<OpenRegion>> {
567        match self {
568            Self::OpenRegions(open_regions) => Some(open_regions),
569            _ => None,
570        }
571    }
572
573    /// Converts the instruction into a vector of [RegionIdent].
574    pub fn into_close_regions(self) -> Option<Vec<RegionIdent>> {
575        match self {
576            Self::CloseRegions(close_regions) => Some(close_regions),
577            _ => None,
578        }
579    }
580
581    /// Converts the instruction into a [FlushRegions].
582    pub fn into_flush_regions(self) -> Option<FlushRegions> {
583        match self {
584            Self::FlushRegions(flush_regions) => Some(flush_regions),
585            _ => None,
586        }
587    }
588
589    /// Converts the instruction into a [DowngradeRegion].
590    pub fn into_downgrade_regions(self) -> Option<Vec<DowngradeRegion>> {
591        match self {
592            Self::DowngradeRegions(downgrade_region) => Some(downgrade_region),
593            _ => None,
594        }
595    }
596
597    /// Converts the instruction into a [UpgradeRegion].
598    pub fn into_upgrade_regions(self) -> Option<Vec<UpgradeRegion>> {
599        match self {
600            Self::UpgradeRegions(upgrade_region) => Some(upgrade_region),
601            _ => None,
602        }
603    }
604
605    pub fn into_get_file_refs(self) -> Option<GetFileRefs> {
606        match self {
607            Self::GetFileRefs(get_file_refs) => Some(get_file_refs),
608            _ => None,
609        }
610    }
611
612    pub fn into_gc_regions(self) -> Option<GcRegions> {
613        match self {
614            Self::GcRegions(gc_regions) => Some(gc_regions),
615            _ => None,
616        }
617    }
618
619    pub fn into_enter_staging_regions(self) -> Option<Vec<EnterStagingRegion>> {
620        match self {
621            Self::EnterStagingRegions(enter_staging) => Some(enter_staging),
622            _ => None,
623        }
624    }
625}
626
627/// The reply of [UpgradeRegion].
628#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
629pub struct UpgradeRegionReply {
630    /// The [RegionId].
631    /// For compatibility, it is defaulted to [RegionId::new(0, 0)].
632    #[serde(default)]
633    pub region_id: RegionId,
634    /// Returns true if `last_entry_id` has been replayed to the latest.
635    pub ready: bool,
636    /// Indicates whether the region exists.
637    pub exists: bool,
638    /// Returns error if any.
639    pub error: Option<String>,
640}
641
642impl Display for UpgradeRegionReply {
643    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
644        write!(
645            f,
646            "(ready={}, exists={}, error={:?})",
647            self.ready, self.exists, self.error
648        )
649    }
650}
651
652#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
653pub struct DowngradeRegionsReply {
654    pub replies: Vec<DowngradeRegionReply>,
655}
656
657impl DowngradeRegionsReply {
658    pub fn new(replies: Vec<DowngradeRegionReply>) -> Self {
659        Self { replies }
660    }
661
662    pub fn single(reply: DowngradeRegionReply) -> Self {
663        Self::new(vec![reply])
664    }
665}
666
667#[derive(Deserialize)]
668#[serde(untagged)]
669enum DowngradeRegionsCompat {
670    Single(DowngradeRegionReply),
671    Multiple(DowngradeRegionsReply),
672}
673
674fn downgrade_regions_compat_from<'de, D>(deserializer: D) -> Result<DowngradeRegionsReply, D::Error>
675where
676    D: Deserializer<'de>,
677{
678    let helper = DowngradeRegionsCompat::deserialize(deserializer)?;
679    Ok(match helper {
680        DowngradeRegionsCompat::Single(x) => DowngradeRegionsReply::new(vec![x]),
681        DowngradeRegionsCompat::Multiple(reply) => reply,
682    })
683}
684
685#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
686pub struct UpgradeRegionsReply {
687    pub replies: Vec<UpgradeRegionReply>,
688}
689
690impl UpgradeRegionsReply {
691    pub fn new(replies: Vec<UpgradeRegionReply>) -> Self {
692        Self { replies }
693    }
694
695    pub fn single(reply: UpgradeRegionReply) -> Self {
696        Self::new(vec![reply])
697    }
698}
699
700#[derive(Deserialize)]
701#[serde(untagged)]
702enum UpgradeRegionsCompat {
703    Single(UpgradeRegionReply),
704    Multiple(UpgradeRegionsReply),
705}
706
707fn upgrade_regions_compat_from<'de, D>(deserializer: D) -> Result<UpgradeRegionsReply, D::Error>
708where
709    D: Deserializer<'de>,
710{
711    let helper = UpgradeRegionsCompat::deserialize(deserializer)?;
712    Ok(match helper {
713        UpgradeRegionsCompat::Single(x) => UpgradeRegionsReply::new(vec![x]),
714        UpgradeRegionsCompat::Multiple(reply) => reply,
715    })
716}
717
718#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
719pub struct EnterStagingRegionReply {
720    pub region_id: RegionId,
721    /// Returns true if the region is under the new region rule.
722    pub ready: bool,
723    /// Indicates whether the region exists.
724    pub exists: bool,
725    /// Return error if any during the operation.
726    pub error: Option<String>,
727}
728
729#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
730pub struct EnterStagingRegionsReply {
731    pub replies: Vec<EnterStagingRegionReply>,
732}
733
734impl EnterStagingRegionsReply {
735    pub fn new(replies: Vec<EnterStagingRegionReply>) -> Self {
736        Self { replies }
737    }
738}
739
740#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
741#[serde(tag = "type", rename_all = "snake_case")]
742pub enum InstructionReply {
743    #[serde(alias = "open_region")]
744    OpenRegions(SimpleReply),
745    #[serde(alias = "close_region")]
746    CloseRegions(SimpleReply),
747    #[serde(
748        deserialize_with = "upgrade_regions_compat_from",
749        alias = "upgrade_region"
750    )]
751    UpgradeRegions(UpgradeRegionsReply),
752    #[serde(
753        alias = "downgrade_region",
754        deserialize_with = "downgrade_regions_compat_from"
755    )]
756    DowngradeRegions(DowngradeRegionsReply),
757    FlushRegions(FlushRegionReply),
758    GetFileRefs(GetFileRefsReply),
759    GcRegions(GcRegionsReply),
760    EnterStagingRegions(EnterStagingRegionsReply),
761}
762
763impl Display for InstructionReply {
764    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
765        match self {
766            Self::OpenRegions(reply) => write!(f, "InstructionReply::OpenRegions({})", reply),
767            Self::CloseRegions(reply) => write!(f, "InstructionReply::CloseRegions({})", reply),
768            Self::UpgradeRegions(reply) => {
769                write!(f, "InstructionReply::UpgradeRegions({:?})", reply.replies)
770            }
771            Self::DowngradeRegions(reply) => {
772                write!(f, "InstructionReply::DowngradeRegions({:?})", reply.replies)
773            }
774            Self::FlushRegions(reply) => write!(f, "InstructionReply::FlushRegions({})", reply),
775            Self::GetFileRefs(reply) => write!(f, "InstructionReply::GetFileRefs({})", reply),
776            Self::GcRegions(reply) => write!(f, "InstructionReply::GcRegion({})", reply),
777            Self::EnterStagingRegions(reply) => {
778                write!(
779                    f,
780                    "InstructionReply::EnterStagingRegions({:?})",
781                    reply.replies
782                )
783            }
784        }
785    }
786}
787
788#[cfg(any(test, feature = "testing"))]
789impl InstructionReply {
790    pub fn expect_close_regions_reply(self) -> SimpleReply {
791        match self {
792            Self::CloseRegions(reply) => reply,
793            _ => panic!("Expected CloseRegions reply"),
794        }
795    }
796
797    pub fn expect_open_regions_reply(self) -> SimpleReply {
798        match self {
799            Self::OpenRegions(reply) => reply,
800            _ => panic!("Expected OpenRegions reply"),
801        }
802    }
803
804    pub fn expect_upgrade_regions_reply(self) -> Vec<UpgradeRegionReply> {
805        match self {
806            Self::UpgradeRegions(reply) => reply.replies,
807            _ => panic!("Expected UpgradeRegion reply"),
808        }
809    }
810
811    pub fn expect_downgrade_regions_reply(self) -> Vec<DowngradeRegionReply> {
812        match self {
813            Self::DowngradeRegions(reply) => reply.replies,
814            _ => panic!("Expected DowngradeRegion reply"),
815        }
816    }
817
818    pub fn expect_flush_regions_reply(self) -> FlushRegionReply {
819        match self {
820            Self::FlushRegions(reply) => reply,
821            _ => panic!("Expected FlushRegions reply"),
822        }
823    }
824
825    pub fn expect_enter_staging_regions_reply(self) -> Vec<EnterStagingRegionReply> {
826        match self {
827            Self::EnterStagingRegions(reply) => reply.replies,
828            _ => panic!("Expected EnterStagingRegion reply"),
829        }
830    }
831}
832
833#[cfg(test)]
834mod tests {
835    use std::collections::HashSet;
836
837    use store_api::storage::{FileId, FileRef};
838
839    use super::*;
840
841    #[test]
842    fn test_serialize_instruction() {
843        let open_region = Instruction::OpenRegions(vec![OpenRegion::new(
844            RegionIdent {
845                datanode_id: 2,
846                table_id: 1024,
847                region_number: 1,
848                engine: "mito2".to_string(),
849            },
850            "test/foo",
851            HashMap::new(),
852            HashMap::new(),
853            false,
854        )]);
855
856        let serialized = serde_json::to_string(&open_region).unwrap();
857        assert_eq!(
858            r#"{"OpenRegions":[{"region_ident":{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"},"region_storage_path":"test/foo","region_options":{},"region_wal_options":{},"skip_wal_replay":false}]}"#,
859            serialized
860        );
861
862        let close_region = Instruction::CloseRegions(vec![RegionIdent {
863            datanode_id: 2,
864            table_id: 1024,
865            region_number: 1,
866            engine: "mito2".to_string(),
867        }]);
868
869        let serialized = serde_json::to_string(&close_region).unwrap();
870        assert_eq!(
871            r#"{"CloseRegions":[{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"}]}"#,
872            serialized
873        );
874
875        let upgrade_region = Instruction::UpgradeRegions(vec![UpgradeRegion {
876            region_id: RegionId::new(1024, 1),
877            last_entry_id: None,
878            metadata_last_entry_id: None,
879            replay_timeout: Duration::from_millis(1000),
880            location_id: None,
881            replay_entry_id: None,
882            metadata_replay_entry_id: None,
883        }]);
884
885        let serialized = serde_json::to_string(&upgrade_region).unwrap();
886        assert_eq!(
887            r#"{"UpgradeRegions":[{"region_id":4398046511105,"last_entry_id":null,"metadata_last_entry_id":null,"replay_timeout":"1s","location_id":null}]}"#,
888            serialized
889        );
890    }
891
892    #[test]
893    fn test_serialize_instruction_reply() {
894        let downgrade_region_reply = InstructionReply::DowngradeRegions(
895            DowngradeRegionsReply::single(DowngradeRegionReply {
896                region_id: RegionId::new(1024, 1),
897                last_entry_id: None,
898                metadata_last_entry_id: None,
899                exists: true,
900                error: None,
901            }),
902        );
903
904        let serialized = serde_json::to_string(&downgrade_region_reply).unwrap();
905        assert_eq!(
906            r#"{"type":"downgrade_regions","replies":[{"region_id":4398046511105,"last_entry_id":null,"metadata_last_entry_id":null,"exists":true,"error":null}]}"#,
907            serialized
908        );
909
910        let upgrade_region_reply =
911            InstructionReply::UpgradeRegions(UpgradeRegionsReply::single(UpgradeRegionReply {
912                region_id: RegionId::new(1024, 1),
913                ready: true,
914                exists: true,
915                error: None,
916            }));
917        let serialized = serde_json::to_string(&upgrade_region_reply).unwrap();
918        assert_eq!(
919            r#"{"type":"upgrade_regions","replies":[{"region_id":4398046511105,"ready":true,"exists":true,"error":null}]}"#,
920            serialized
921        );
922    }
923
924    #[test]
925    fn test_deserialize_instruction() {
926        // legacy open region instruction
927        let open_region_instruction = r#"{"OpenRegion":{"region_ident":{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"},"region_storage_path":"test/foo","region_options":{},"region_wal_options":{},"skip_wal_replay":false}}"#;
928        let open_region_instruction: Instruction =
929            serde_json::from_str(open_region_instruction).unwrap();
930        let open_region = Instruction::OpenRegions(vec![OpenRegion::new(
931            RegionIdent {
932                datanode_id: 2,
933                table_id: 1024,
934                region_number: 1,
935                engine: "mito2".to_string(),
936            },
937            "test/foo",
938            HashMap::new(),
939            HashMap::new(),
940            false,
941        )]);
942        assert_eq!(open_region_instruction, open_region);
943
944        // legacy close region instruction
945        let close_region_instruction = r#"{"CloseRegion":{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"}}"#;
946        let close_region_instruction: Instruction =
947            serde_json::from_str(close_region_instruction).unwrap();
948        let close_region = Instruction::CloseRegions(vec![RegionIdent {
949            datanode_id: 2,
950            table_id: 1024,
951            region_number: 1,
952            engine: "mito2".to_string(),
953        }]);
954        assert_eq!(close_region_instruction, close_region);
955
956        // legacy downgrade region instruction
957        let downgrade_region_instruction = r#"{"DowngradeRegions":{"region_id":4398046511105,"flush_timeout":{"secs":1,"nanos":0}}}"#;
958        let downgrade_region_instruction: Instruction =
959            serde_json::from_str(downgrade_region_instruction).unwrap();
960        let downgrade_region = Instruction::DowngradeRegions(vec![DowngradeRegion {
961            region_id: RegionId::new(1024, 1),
962            flush_timeout: Some(Duration::from_millis(1000)),
963        }]);
964        assert_eq!(downgrade_region_instruction, downgrade_region);
965
966        // legacy upgrade region instruction
967        let upgrade_region_instruction = r#"{"UpgradeRegion":{"region_id":4398046511105,"last_entry_id":null,"metadata_last_entry_id":null,"replay_timeout":"1s","location_id":null,"replay_entry_id":null,"metadata_replay_entry_id":null}}"#;
968        let upgrade_region_instruction: Instruction =
969            serde_json::from_str(upgrade_region_instruction).unwrap();
970        let upgrade_region = Instruction::UpgradeRegions(vec![UpgradeRegion {
971            region_id: RegionId::new(1024, 1),
972            last_entry_id: None,
973            metadata_last_entry_id: None,
974            replay_timeout: Duration::from_millis(1000),
975            location_id: None,
976            replay_entry_id: None,
977            metadata_replay_entry_id: None,
978        }]);
979        assert_eq!(upgrade_region_instruction, upgrade_region);
980    }
981
982    #[test]
983    fn test_deserialize_instruction_reply() {
984        // legacy close region reply
985        let close_region_instruction_reply =
986            r#"{"result":true,"error":null,"type":"close_region"}"#;
987        let close_region_instruction_reply: InstructionReply =
988            serde_json::from_str(close_region_instruction_reply).unwrap();
989        let close_region_reply = InstructionReply::CloseRegions(SimpleReply {
990            result: true,
991            error: None,
992        });
993        assert_eq!(close_region_instruction_reply, close_region_reply);
994
995        // legacy open region reply
996        let open_region_instruction_reply = r#"{"result":true,"error":null,"type":"open_region"}"#;
997        let open_region_instruction_reply: InstructionReply =
998            serde_json::from_str(open_region_instruction_reply).unwrap();
999        let open_region_reply = InstructionReply::OpenRegions(SimpleReply {
1000            result: true,
1001            error: None,
1002        });
1003        assert_eq!(open_region_instruction_reply, open_region_reply);
1004
1005        // legacy downgrade region reply
1006        let downgrade_region_instruction_reply = r#"{"region_id":4398046511105,"last_entry_id":null,"metadata_last_entry_id":null,"exists":true,"error":null,"type":"downgrade_region"}"#;
1007        let downgrade_region_instruction_reply: InstructionReply =
1008            serde_json::from_str(downgrade_region_instruction_reply).unwrap();
1009        let downgrade_region_reply = InstructionReply::DowngradeRegions(
1010            DowngradeRegionsReply::single(DowngradeRegionReply {
1011                region_id: RegionId::new(1024, 1),
1012                last_entry_id: None,
1013                metadata_last_entry_id: None,
1014                exists: true,
1015                error: None,
1016            }),
1017        );
1018        assert_eq!(downgrade_region_instruction_reply, downgrade_region_reply);
1019
1020        // legacy upgrade region reply
1021        let upgrade_region_instruction_reply = r#"{"region_id":4398046511105,"ready":true,"exists":true,"error":null,"type":"upgrade_region"}"#;
1022        let upgrade_region_instruction_reply: InstructionReply =
1023            serde_json::from_str(upgrade_region_instruction_reply).unwrap();
1024        let upgrade_region_reply =
1025            InstructionReply::UpgradeRegions(UpgradeRegionsReply::single(UpgradeRegionReply {
1026                region_id: RegionId::new(1024, 1),
1027                ready: true,
1028                exists: true,
1029                error: None,
1030            }));
1031        assert_eq!(upgrade_region_instruction_reply, upgrade_region_reply);
1032    }
1033
1034    #[derive(Debug, Clone, Serialize, Deserialize)]
1035    struct LegacyOpenRegion {
1036        region_ident: RegionIdent,
1037        region_storage_path: String,
1038        region_options: HashMap<String, String>,
1039    }
1040
1041    #[test]
1042    fn test_compatible_serialize_open_region() {
1043        let region_ident = RegionIdent {
1044            datanode_id: 2,
1045            table_id: 1024,
1046            region_number: 1,
1047            engine: "mito2".to_string(),
1048        };
1049        let region_storage_path = "test/foo".to_string();
1050        let region_options = HashMap::from([
1051            ("a".to_string(), "aa".to_string()),
1052            ("b".to_string(), "bb".to_string()),
1053        ]);
1054
1055        // Serialize a legacy OpenRegion.
1056        let legacy_open_region = LegacyOpenRegion {
1057            region_ident: region_ident.clone(),
1058            region_storage_path: region_storage_path.clone(),
1059            region_options: region_options.clone(),
1060        };
1061        let serialized = serde_json::to_string(&legacy_open_region).unwrap();
1062
1063        // Deserialize to OpenRegion.
1064        let deserialized = serde_json::from_str(&serialized).unwrap();
1065        let expected = OpenRegion {
1066            region_ident,
1067            region_storage_path,
1068            region_options,
1069            region_wal_options: HashMap::new(),
1070            skip_wal_replay: false,
1071        };
1072        assert_eq!(expected, deserialized);
1073    }
1074
1075    #[test]
1076    fn test_flush_regions_creation() {
1077        let region_id = RegionId::new(1024, 1);
1078
1079        // Single region sync flush
1080        let single_sync = FlushRegions::sync_single(region_id);
1081        assert_eq!(single_sync.region_ids, vec![region_id]);
1082        assert_eq!(single_sync.strategy, FlushStrategy::Sync);
1083        assert!(!single_sync.is_hint());
1084        assert!(single_sync.is_sync());
1085        assert_eq!(single_sync.error_strategy, FlushErrorStrategy::FailFast);
1086        assert!(single_sync.is_single_region());
1087        assert_eq!(single_sync.single_region_id(), Some(region_id));
1088
1089        // Batch async flush (hint)
1090        let region_ids = vec![RegionId::new(1024, 1), RegionId::new(1024, 2)];
1091        let batch_async = FlushRegions::async_batch(region_ids.clone());
1092        assert_eq!(batch_async.region_ids, region_ids);
1093        assert_eq!(batch_async.strategy, FlushStrategy::Async);
1094        assert!(batch_async.is_hint());
1095        assert!(!batch_async.is_sync());
1096        assert_eq!(batch_async.error_strategy, FlushErrorStrategy::TryAll);
1097        assert!(!batch_async.is_single_region());
1098        assert_eq!(batch_async.single_region_id(), None);
1099
1100        // Batch sync flush
1101        let batch_sync = FlushRegions::sync_batch(region_ids.clone(), FlushErrorStrategy::FailFast);
1102        assert_eq!(batch_sync.region_ids, region_ids);
1103        assert_eq!(batch_sync.strategy, FlushStrategy::Sync);
1104        assert!(!batch_sync.is_hint());
1105        assert!(batch_sync.is_sync());
1106        assert_eq!(batch_sync.error_strategy, FlushErrorStrategy::FailFast);
1107    }
1108
1109    #[test]
1110    fn test_flush_regions_conversion() {
1111        let region_id = RegionId::new(1024, 1);
1112
1113        let from_region_id: FlushRegions = region_id.into();
1114        assert_eq!(from_region_id.region_ids, vec![region_id]);
1115        assert_eq!(from_region_id.strategy, FlushStrategy::Sync);
1116        assert!(!from_region_id.is_hint());
1117        assert!(from_region_id.is_sync());
1118
1119        // Test default construction
1120        let flush_regions = FlushRegions {
1121            region_ids: vec![region_id],
1122            strategy: FlushStrategy::Async,
1123            error_strategy: FlushErrorStrategy::TryAll,
1124        };
1125        assert_eq!(flush_regions.region_ids, vec![region_id]);
1126        assert_eq!(flush_regions.strategy, FlushStrategy::Async);
1127        assert!(flush_regions.is_hint());
1128        assert!(!flush_regions.is_sync());
1129    }
1130
1131    #[test]
1132    fn test_flush_region_reply() {
1133        let region_id = RegionId::new(1024, 1);
1134
1135        // Successful single region reply
1136        let success_reply = FlushRegionReply::success_single(region_id);
1137        assert!(success_reply.overall_success);
1138        assert_eq!(success_reply.results.len(), 1);
1139        assert_eq!(success_reply.results[0].0, region_id);
1140        assert!(success_reply.results[0].1.is_ok());
1141
1142        // Failed single region reply
1143        let error_reply = FlushRegionReply::error_single(region_id, "test error".to_string());
1144        assert!(!error_reply.overall_success);
1145        assert_eq!(error_reply.results.len(), 1);
1146        assert_eq!(error_reply.results[0].0, region_id);
1147        assert!(error_reply.results[0].1.is_err());
1148
1149        // Batch reply
1150        let region_id2 = RegionId::new(1024, 2);
1151        let results = vec![
1152            (region_id, Ok(())),
1153            (region_id2, Err("flush failed".to_string())),
1154        ];
1155        let batch_reply = FlushRegionReply::from_results(results);
1156        assert!(!batch_reply.overall_success);
1157        assert_eq!(batch_reply.results.len(), 2);
1158
1159        // Conversion to SimpleReply
1160        let simple_reply = batch_reply.to_simple_reply();
1161        assert!(!simple_reply.result);
1162        assert!(simple_reply.error.is_some());
1163        assert!(simple_reply.error.unwrap().contains("flush failed"));
1164    }
1165
1166    #[test]
1167    fn test_serialize_flush_regions_instruction() {
1168        let region_id = RegionId::new(1024, 1);
1169        let flush_regions = FlushRegions::sync_single(region_id);
1170        let instruction = Instruction::FlushRegions(flush_regions.clone());
1171
1172        let serialized = serde_json::to_string(&instruction).unwrap();
1173        let deserialized: Instruction = serde_json::from_str(&serialized).unwrap();
1174
1175        match deserialized {
1176            Instruction::FlushRegions(fr) => {
1177                assert_eq!(fr.region_ids, vec![region_id]);
1178                assert_eq!(fr.strategy, FlushStrategy::Sync);
1179                assert_eq!(fr.error_strategy, FlushErrorStrategy::FailFast);
1180            }
1181            _ => panic!("Expected FlushRegions instruction"),
1182        }
1183    }
1184
1185    #[test]
1186    fn test_serialize_flush_regions_batch_instruction() {
1187        let region_ids = vec![RegionId::new(1024, 1), RegionId::new(1024, 2)];
1188        let flush_regions =
1189            FlushRegions::sync_batch(region_ids.clone(), FlushErrorStrategy::TryAll);
1190        let instruction = Instruction::FlushRegions(flush_regions);
1191
1192        let serialized = serde_json::to_string(&instruction).unwrap();
1193        let deserialized: Instruction = serde_json::from_str(&serialized).unwrap();
1194
1195        match deserialized {
1196            Instruction::FlushRegions(fr) => {
1197                assert_eq!(fr.region_ids, region_ids);
1198                assert_eq!(fr.strategy, FlushStrategy::Sync);
1199                assert!(!fr.is_hint());
1200                assert!(fr.is_sync());
1201                assert_eq!(fr.error_strategy, FlushErrorStrategy::TryAll);
1202            }
1203            _ => panic!("Expected FlushRegions instruction"),
1204        }
1205    }
1206
1207    #[test]
1208    fn test_serialize_get_file_refs_instruction_reply() {
1209        let mut manifest = FileRefsManifest::default();
1210        let r0 = RegionId::new(1024, 1);
1211        let r1 = RegionId::new(1024, 2);
1212        manifest.file_refs.insert(
1213            r0,
1214            HashSet::from([FileRef::new(r0, FileId::random(), None)]),
1215        );
1216        manifest.file_refs.insert(
1217            r1,
1218            HashSet::from([FileRef::new(r1, FileId::random(), None)]),
1219        );
1220        manifest.manifest_version.insert(r0, 10);
1221        manifest.manifest_version.insert(r1, 20);
1222
1223        let instruction_reply = InstructionReply::GetFileRefs(GetFileRefsReply {
1224            file_refs_manifest: manifest,
1225            success: true,
1226            error: None,
1227        });
1228
1229        let serialized = serde_json::to_string(&instruction_reply).unwrap();
1230        let deserialized = serde_json::from_str(&serialized).unwrap();
1231
1232        assert_eq!(instruction_reply, deserialized);
1233    }
1234}