common_meta/
instruction.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::fmt::{Display, Formatter};
17use std::time::Duration;
18
19use serde::{Deserialize, Deserializer, Serialize};
20use store_api::storage::{FileRefsManifest, GcReport, RegionId, RegionNumber};
21use strum::Display;
22use table::metadata::TableId;
23use table::table_name::TableName;
24
25use crate::flow_name::FlowName;
26use crate::key::schema_name::SchemaName;
27use crate::key::{FlowId, FlowPartitionId};
28use crate::peer::Peer;
29use crate::{DatanodeId, FlownodeId};
30
31#[derive(Eq, Hash, PartialEq, Clone, Debug, Serialize, Deserialize)]
32pub struct RegionIdent {
33    pub datanode_id: DatanodeId,
34    pub table_id: TableId,
35    pub region_number: RegionNumber,
36    pub engine: String,
37}
38
39impl RegionIdent {
40    pub fn get_region_id(&self) -> RegionId {
41        RegionId::new(self.table_id, self.region_number)
42    }
43}
44
45impl Display for RegionIdent {
46    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
47        write!(
48            f,
49            "RegionIdent(datanode_id='{}', table_id={}, region_number={}, engine = {})",
50            self.datanode_id, self.table_id, self.region_number, self.engine
51        )
52    }
53}
54
55/// The result of downgrade leader region.
56#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
57pub struct DowngradeRegionReply {
58    /// The [RegionId].
59    /// For compatibility, it is defaulted to [RegionId::new(0, 0)].
60    #[serde(default)]
61    pub region_id: RegionId,
62    /// Returns the `last_entry_id` if available.
63    pub last_entry_id: Option<u64>,
64    /// Returns the `metadata_last_entry_id` if available (Only available for metric engine).
65    pub metadata_last_entry_id: Option<u64>,
66    /// Indicates whether the region exists.
67    pub exists: bool,
68    /// Return error if any during the operation.
69    pub error: Option<String>,
70}
71
72impl Display for DowngradeRegionReply {
73    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
74        write!(
75            f,
76            "(last_entry_id={:?}, exists={}, error={:?})",
77            self.last_entry_id, self.exists, self.error
78        )
79    }
80}
81
82#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
83pub struct SimpleReply {
84    pub result: bool,
85    pub error: Option<String>,
86}
87
88/// Reply for flush region operations with support for batch results.
89#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
90pub struct FlushRegionReply {
91    /// Results for each region that was attempted to be flushed.
92    /// For single region flushes, this will contain one result.
93    /// For batch flushes, this contains results for all attempted regions.
94    pub results: Vec<(RegionId, Result<(), String>)>,
95    /// Overall success: true if all regions were flushed successfully.
96    pub overall_success: bool,
97}
98
99impl FlushRegionReply {
100    /// Create a successful single region reply.
101    pub fn success_single(region_id: RegionId) -> Self {
102        Self {
103            results: vec![(region_id, Ok(()))],
104            overall_success: true,
105        }
106    }
107
108    /// Create a failed single region reply.
109    pub fn error_single(region_id: RegionId, error: String) -> Self {
110        Self {
111            results: vec![(region_id, Err(error))],
112            overall_success: false,
113        }
114    }
115
116    /// Create a batch reply from individual results.
117    pub fn from_results(results: Vec<(RegionId, Result<(), String>)>) -> Self {
118        let overall_success = results.iter().all(|(_, result)| result.is_ok());
119        Self {
120            results,
121            overall_success,
122        }
123    }
124
125    /// Convert to SimpleReply for backward compatibility.
126    pub fn to_simple_reply(&self) -> SimpleReply {
127        if self.overall_success {
128            SimpleReply {
129                result: true,
130                error: None,
131            }
132        } else {
133            let errors: Vec<String> = self
134                .results
135                .iter()
136                .filter_map(|(region_id, result)| {
137                    result
138                        .as_ref()
139                        .err()
140                        .map(|err| format!("{}: {}", region_id, err))
141                })
142                .collect();
143            SimpleReply {
144                result: false,
145                error: Some(errors.join("; ")),
146            }
147        }
148    }
149}
150
151impl Display for SimpleReply {
152    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
153        write!(f, "(result={}, error={:?})", self.result, self.error)
154    }
155}
156
157impl Display for FlushRegionReply {
158    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
159        let results_str = self
160            .results
161            .iter()
162            .map(|(region_id, result)| match result {
163                Ok(()) => format!("{}:OK", region_id),
164                Err(err) => format!("{}:ERR({})", region_id, err),
165            })
166            .collect::<Vec<_>>()
167            .join(", ");
168        write!(
169            f,
170            "(overall_success={}, results=[{}])",
171            self.overall_success, results_str
172        )
173    }
174}
175
176impl Display for OpenRegion {
177    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
178        write!(
179            f,
180            "OpenRegion(region_ident={}, region_storage_path={})",
181            self.region_ident, self.region_storage_path
182        )
183    }
184}
185
186#[serde_with::serde_as]
187#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
188pub struct OpenRegion {
189    pub region_ident: RegionIdent,
190    pub region_storage_path: String,
191    pub region_options: HashMap<String, String>,
192    #[serde(default)]
193    #[serde_as(as = "HashMap<serde_with::DisplayFromStr, _>")]
194    pub region_wal_options: HashMap<RegionNumber, String>,
195    #[serde(default)]
196    pub skip_wal_replay: bool,
197}
198
199impl OpenRegion {
200    pub fn new(
201        region_ident: RegionIdent,
202        path: &str,
203        region_options: HashMap<String, String>,
204        region_wal_options: HashMap<RegionNumber, String>,
205        skip_wal_replay: bool,
206    ) -> Self {
207        Self {
208            region_ident,
209            region_storage_path: path.to_string(),
210            region_options,
211            region_wal_options,
212            skip_wal_replay,
213        }
214    }
215}
216
217/// The instruction of downgrading leader region.
218#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
219pub struct DowngradeRegion {
220    /// The [RegionId].
221    pub region_id: RegionId,
222    /// The timeout of waiting for flush the region.
223    ///
224    /// `None` stands for don't flush before downgrading the region.
225    #[serde(default)]
226    pub flush_timeout: Option<Duration>,
227}
228
229impl Display for DowngradeRegion {
230    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
231        write!(
232            f,
233            "DowngradeRegion(region_id={}, flush_timeout={:?})",
234            self.region_id, self.flush_timeout,
235        )
236    }
237}
238
239/// Upgrades a follower region to leader region.
240#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
241pub struct UpgradeRegion {
242    /// The [RegionId].
243    pub region_id: RegionId,
244    /// The `last_entry_id` of old leader region.
245    pub last_entry_id: Option<u64>,
246    /// The `last_entry_id` of old leader metadata region (Only used for metric engine).
247    pub metadata_last_entry_id: Option<u64>,
248    /// The timeout of waiting for a wal replay.
249    ///
250    /// `None` stands for no wait,
251    /// it's helpful to verify whether the leader region is ready.
252    #[serde(with = "humantime_serde")]
253    pub replay_timeout: Duration,
254    /// The hint for replaying memtable.
255    #[serde(default)]
256    pub location_id: Option<u64>,
257    #[serde(default, skip_serializing_if = "Option::is_none")]
258    pub replay_entry_id: Option<u64>,
259    #[serde(default, skip_serializing_if = "Option::is_none")]
260    pub metadata_replay_entry_id: Option<u64>,
261}
262
263impl UpgradeRegion {
264    /// Sets the replay entry id.
265    pub fn with_replay_entry_id(mut self, replay_entry_id: Option<u64>) -> Self {
266        self.replay_entry_id = replay_entry_id;
267        self
268    }
269
270    /// Sets the metadata replay entry id.
271    pub fn with_metadata_replay_entry_id(mut self, metadata_replay_entry_id: Option<u64>) -> Self {
272        self.metadata_replay_entry_id = metadata_replay_entry_id;
273        self
274    }
275}
276
277#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
278/// The identifier of cache.
279pub enum CacheIdent {
280    FlowId(FlowId),
281    /// Indicate change of address of flownode.
282    FlowNodeAddressChange(u64),
283    FlowName(FlowName),
284    TableId(TableId),
285    TableName(TableName),
286    SchemaName(SchemaName),
287    CreateFlow(CreateFlow),
288    DropFlow(DropFlow),
289}
290
291#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
292pub struct CreateFlow {
293    /// The unique identifier for the flow.
294    pub flow_id: FlowId,
295    pub source_table_ids: Vec<TableId>,
296    /// Mapping of flow partition to peer information
297    pub partition_to_peer_mapping: Vec<(FlowPartitionId, Peer)>,
298}
299
300#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
301pub struct DropFlow {
302    pub flow_id: FlowId,
303    pub source_table_ids: Vec<TableId>,
304    /// Mapping of flow partition to flownode id
305    pub flow_part2node_id: Vec<(FlowPartitionId, FlownodeId)>,
306}
307
308/// Strategy for executing flush operations.
309#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
310pub enum FlushStrategy {
311    /// Synchronous operation that waits for completion and expects a reply
312    #[default]
313    Sync,
314    /// Asynchronous hint operation (fire-and-forget, no reply expected)
315    Async,
316}
317
318/// Error handling strategy for batch flush operations.
319#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
320pub enum FlushErrorStrategy {
321    /// Abort on first error (fail-fast)
322    #[default]
323    FailFast,
324    /// Attempt to flush all regions and collect all errors
325    TryAll,
326}
327
328/// Unified flush instruction supporting both single and batch operations
329/// with configurable execution strategies and error handling.
330#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
331pub struct FlushRegions {
332    /// List of region IDs to flush. Can contain a single region or multiple regions.
333    pub region_ids: Vec<RegionId>,
334    /// Execution strategy: Sync (expects reply) or Async (fire-and-forget hint).
335    #[serde(default)]
336    pub strategy: FlushStrategy,
337    /// Error handling strategy for batch operations (only applies when multiple regions and sync strategy).
338    #[serde(default)]
339    pub error_strategy: FlushErrorStrategy,
340}
341
342impl Display for FlushRegions {
343    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
344        write!(
345            f,
346            "FlushRegions(region_ids={:?}, strategy={:?}, error_strategy={:?})",
347            self.region_ids, self.strategy, self.error_strategy
348        )
349    }
350}
351
352impl FlushRegions {
353    /// Create synchronous single-region flush
354    pub fn sync_single(region_id: RegionId) -> Self {
355        Self {
356            region_ids: vec![region_id],
357            strategy: FlushStrategy::Sync,
358            error_strategy: FlushErrorStrategy::FailFast,
359        }
360    }
361
362    /// Create asynchronous batch flush (fire-and-forget)
363    pub fn async_batch(region_ids: Vec<RegionId>) -> Self {
364        Self {
365            region_ids,
366            strategy: FlushStrategy::Async,
367            error_strategy: FlushErrorStrategy::TryAll,
368        }
369    }
370
371    /// Create synchronous batch flush with error strategy
372    pub fn sync_batch(region_ids: Vec<RegionId>, error_strategy: FlushErrorStrategy) -> Self {
373        Self {
374            region_ids,
375            strategy: FlushStrategy::Sync,
376            error_strategy,
377        }
378    }
379
380    /// Check if this is a single region flush.
381    pub fn is_single_region(&self) -> bool {
382        self.region_ids.len() == 1
383    }
384
385    /// Get the single region ID if this is a single region flush.
386    pub fn single_region_id(&self) -> Option<RegionId> {
387        if self.is_single_region() {
388            self.region_ids.first().copied()
389        } else {
390            None
391        }
392    }
393
394    /// Check if this is a hint (asynchronous) operation.
395    pub fn is_hint(&self) -> bool {
396        matches!(self.strategy, FlushStrategy::Async)
397    }
398
399    /// Check if this is a synchronous operation.
400    pub fn is_sync(&self) -> bool {
401        matches!(self.strategy, FlushStrategy::Sync)
402    }
403}
404
405impl From<RegionId> for FlushRegions {
406    fn from(region_id: RegionId) -> Self {
407        Self::sync_single(region_id)
408    }
409}
410
411#[derive(Debug, Deserialize)]
412#[serde(untagged)]
413enum SingleOrMultiple<T> {
414    Single(T),
415    Multiple(Vec<T>),
416}
417
418fn single_or_multiple_from<'de, D, T>(deserializer: D) -> Result<Vec<T>, D::Error>
419where
420    D: Deserializer<'de>,
421    T: Deserialize<'de>,
422{
423    let helper = SingleOrMultiple::<T>::deserialize(deserializer)?;
424    Ok(match helper {
425        SingleOrMultiple::Single(x) => vec![x],
426        SingleOrMultiple::Multiple(xs) => xs,
427    })
428}
429
430/// Instruction to get file references for specified regions.
431#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
432pub struct GetFileRefs {
433    /// List of region IDs to get file references from active FileHandles (in-memory).
434    pub query_regions: Vec<RegionId>,
435    /// Mapping from the source region ID (where to read the manifest) to
436    /// the target region IDs (whose file references to look for).
437    /// Key: The region ID of the manifest.
438    /// Value: The list of region IDs to find references for in that manifest.
439    pub related_regions: HashMap<RegionId, Vec<RegionId>>,
440}
441
442impl Display for GetFileRefs {
443    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
444        write!(f, "GetFileRefs(region_ids={:?})", self.query_regions)
445    }
446}
447
448/// Instruction to trigger garbage collection for a region.
449#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
450pub struct GcRegions {
451    /// The region ID to perform GC on, only regions that are currently on the given datanode can be garbage collected, regions not on the datanode will report errors.
452    pub regions: Vec<RegionId>,
453    /// The file references manifest containing temporary file references.
454    pub file_refs_manifest: FileRefsManifest,
455    /// Whether to perform a full file listing to find orphan files.
456    pub full_file_listing: bool,
457}
458
459impl Display for GcRegions {
460    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
461        write!(
462            f,
463            "GcRegion(regions={:?}, file_refs_count={}, full_file_listing={})",
464            self.regions,
465            self.file_refs_manifest.file_refs.len(),
466            self.full_file_listing
467        )
468    }
469}
470
471/// Reply for GetFileRefs instruction.
472#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
473pub struct GetFileRefsReply {
474    /// The file references manifest.
475    pub file_refs_manifest: FileRefsManifest,
476    /// Whether the operation was successful.
477    pub success: bool,
478    /// Error message if any.
479    pub error: Option<String>,
480}
481
482impl Display for GetFileRefsReply {
483    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
484        write!(
485            f,
486            "GetFileRefsReply(success={}, file_refs_count={}, error={:?})",
487            self.success,
488            self.file_refs_manifest.file_refs.len(),
489            self.error
490        )
491    }
492}
493
494/// Reply for GC instruction.
495#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
496pub struct GcRegionsReply {
497    pub result: Result<GcReport, String>,
498}
499
500impl Display for GcRegionsReply {
501    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
502        write!(
503            f,
504            "GcReply(result={})",
505            match &self.result {
506                Ok(report) => format!(
507                    "GcReport(deleted_files_count={}, need_retry_regions_count={})",
508                    report.deleted_files.len(),
509                    report.need_retry_regions.len()
510                ),
511                Err(err) => format!("Err({})", err),
512            }
513        )
514    }
515}
516
517#[derive(Debug, Clone, Serialize, Deserialize, Display, PartialEq)]
518pub enum Instruction {
519    /// Opens regions.
520    #[serde(deserialize_with = "single_or_multiple_from", alias = "OpenRegion")]
521    OpenRegions(Vec<OpenRegion>),
522    /// Closes regions.
523    #[serde(deserialize_with = "single_or_multiple_from", alias = "CloseRegion")]
524    CloseRegions(Vec<RegionIdent>),
525    /// Upgrades regions.
526    #[serde(deserialize_with = "single_or_multiple_from", alias = "UpgradeRegion")]
527    UpgradeRegions(Vec<UpgradeRegion>),
528    #[serde(
529        deserialize_with = "single_or_multiple_from",
530        alias = "DowngradeRegion"
531    )]
532    /// Downgrades regions.
533    DowngradeRegions(Vec<DowngradeRegion>),
534    /// Invalidates batch cache.
535    InvalidateCaches(Vec<CacheIdent>),
536    /// Flushes regions.
537    FlushRegions(FlushRegions),
538    /// Gets file references for regions.
539    GetFileRefs(GetFileRefs),
540    /// Triggers garbage collection for a region.
541    GcRegions(GcRegions),
542}
543
544impl Instruction {
545    /// Converts the instruction into a vector of [OpenRegion].
546    pub fn into_open_regions(self) -> Option<Vec<OpenRegion>> {
547        match self {
548            Self::OpenRegions(open_regions) => Some(open_regions),
549            _ => None,
550        }
551    }
552
553    /// Converts the instruction into a vector of [RegionIdent].
554    pub fn into_close_regions(self) -> Option<Vec<RegionIdent>> {
555        match self {
556            Self::CloseRegions(close_regions) => Some(close_regions),
557            _ => None,
558        }
559    }
560
561    /// Converts the instruction into a [FlushRegions].
562    pub fn into_flush_regions(self) -> Option<FlushRegions> {
563        match self {
564            Self::FlushRegions(flush_regions) => Some(flush_regions),
565            _ => None,
566        }
567    }
568
569    /// Converts the instruction into a [DowngradeRegion].
570    pub fn into_downgrade_regions(self) -> Option<Vec<DowngradeRegion>> {
571        match self {
572            Self::DowngradeRegions(downgrade_region) => Some(downgrade_region),
573            _ => None,
574        }
575    }
576
577    /// Converts the instruction into a [UpgradeRegion].
578    pub fn into_upgrade_regions(self) -> Option<Vec<UpgradeRegion>> {
579        match self {
580            Self::UpgradeRegions(upgrade_region) => Some(upgrade_region),
581            _ => None,
582        }
583    }
584
585    pub fn into_get_file_refs(self) -> Option<GetFileRefs> {
586        match self {
587            Self::GetFileRefs(get_file_refs) => Some(get_file_refs),
588            _ => None,
589        }
590    }
591
592    pub fn into_gc_regions(self) -> Option<GcRegions> {
593        match self {
594            Self::GcRegions(gc_regions) => Some(gc_regions),
595            _ => None,
596        }
597    }
598}
599
600/// The reply of [UpgradeRegion].
601#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
602pub struct UpgradeRegionReply {
603    /// The [RegionId].
604    /// For compatibility, it is defaulted to [RegionId::new(0, 0)].
605    #[serde(default)]
606    pub region_id: RegionId,
607    /// Returns true if `last_entry_id` has been replayed to the latest.
608    pub ready: bool,
609    /// Indicates whether the region exists.
610    pub exists: bool,
611    /// Returns error if any.
612    pub error: Option<String>,
613}
614
615impl Display for UpgradeRegionReply {
616    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
617        write!(
618            f,
619            "(ready={}, exists={}, error={:?})",
620            self.ready, self.exists, self.error
621        )
622    }
623}
624
625#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
626pub struct DowngradeRegionsReply {
627    pub replies: Vec<DowngradeRegionReply>,
628}
629
630impl DowngradeRegionsReply {
631    pub fn new(replies: Vec<DowngradeRegionReply>) -> Self {
632        Self { replies }
633    }
634
635    pub fn single(reply: DowngradeRegionReply) -> Self {
636        Self::new(vec![reply])
637    }
638}
639
640#[derive(Deserialize)]
641#[serde(untagged)]
642enum DowngradeRegionsCompat {
643    Single(DowngradeRegionReply),
644    Multiple(DowngradeRegionsReply),
645}
646
647fn downgrade_regions_compat_from<'de, D>(deserializer: D) -> Result<DowngradeRegionsReply, D::Error>
648where
649    D: Deserializer<'de>,
650{
651    let helper = DowngradeRegionsCompat::deserialize(deserializer)?;
652    Ok(match helper {
653        DowngradeRegionsCompat::Single(x) => DowngradeRegionsReply::new(vec![x]),
654        DowngradeRegionsCompat::Multiple(reply) => reply,
655    })
656}
657
658#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
659pub struct UpgradeRegionsReply {
660    pub replies: Vec<UpgradeRegionReply>,
661}
662
663impl UpgradeRegionsReply {
664    pub fn new(replies: Vec<UpgradeRegionReply>) -> Self {
665        Self { replies }
666    }
667
668    pub fn single(reply: UpgradeRegionReply) -> Self {
669        Self::new(vec![reply])
670    }
671}
672
673#[derive(Deserialize)]
674#[serde(untagged)]
675enum UpgradeRegionsCompat {
676    Single(UpgradeRegionReply),
677    Multiple(UpgradeRegionsReply),
678}
679
680fn upgrade_regions_compat_from<'de, D>(deserializer: D) -> Result<UpgradeRegionsReply, D::Error>
681where
682    D: Deserializer<'de>,
683{
684    let helper = UpgradeRegionsCompat::deserialize(deserializer)?;
685    Ok(match helper {
686        UpgradeRegionsCompat::Single(x) => UpgradeRegionsReply::new(vec![x]),
687        UpgradeRegionsCompat::Multiple(reply) => reply,
688    })
689}
690
691#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
692#[serde(tag = "type", rename_all = "snake_case")]
693pub enum InstructionReply {
694    #[serde(alias = "open_region")]
695    OpenRegions(SimpleReply),
696    #[serde(alias = "close_region")]
697    CloseRegions(SimpleReply),
698    #[serde(
699        deserialize_with = "upgrade_regions_compat_from",
700        alias = "upgrade_region"
701    )]
702    UpgradeRegions(UpgradeRegionsReply),
703    #[serde(
704        alias = "downgrade_region",
705        deserialize_with = "downgrade_regions_compat_from"
706    )]
707    DowngradeRegions(DowngradeRegionsReply),
708    FlushRegions(FlushRegionReply),
709    GetFileRefs(GetFileRefsReply),
710    GcRegions(GcRegionsReply),
711}
712
713impl Display for InstructionReply {
714    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
715        match self {
716            Self::OpenRegions(reply) => write!(f, "InstructionReply::OpenRegions({})", reply),
717            Self::CloseRegions(reply) => write!(f, "InstructionReply::CloseRegions({})", reply),
718            Self::UpgradeRegions(reply) => {
719                write!(f, "InstructionReply::UpgradeRegions({:?})", reply.replies)
720            }
721            Self::DowngradeRegions(reply) => {
722                write!(f, "InstructionReply::DowngradeRegions({:?})", reply.replies)
723            }
724            Self::FlushRegions(reply) => write!(f, "InstructionReply::FlushRegions({})", reply),
725            Self::GetFileRefs(reply) => write!(f, "InstructionReply::GetFileRefs({})", reply),
726            Self::GcRegions(reply) => write!(f, "InstructionReply::GcRegion({})", reply),
727        }
728    }
729}
730
731#[cfg(any(test, feature = "testing"))]
732impl InstructionReply {
733    pub fn expect_close_regions_reply(self) -> SimpleReply {
734        match self {
735            Self::CloseRegions(reply) => reply,
736            _ => panic!("Expected CloseRegions reply"),
737        }
738    }
739
740    pub fn expect_open_regions_reply(self) -> SimpleReply {
741        match self {
742            Self::OpenRegions(reply) => reply,
743            _ => panic!("Expected OpenRegions reply"),
744        }
745    }
746
747    pub fn expect_upgrade_regions_reply(self) -> Vec<UpgradeRegionReply> {
748        match self {
749            Self::UpgradeRegions(reply) => reply.replies,
750            _ => panic!("Expected UpgradeRegion reply"),
751        }
752    }
753
754    pub fn expect_downgrade_regions_reply(self) -> Vec<DowngradeRegionReply> {
755        match self {
756            Self::DowngradeRegions(reply) => reply.replies,
757            _ => panic!("Expected DowngradeRegion reply"),
758        }
759    }
760
761    pub fn expect_flush_regions_reply(self) -> FlushRegionReply {
762        match self {
763            Self::FlushRegions(reply) => reply,
764            _ => panic!("Expected FlushRegions reply"),
765        }
766    }
767}
768
769#[cfg(test)]
770mod tests {
771    use std::collections::HashSet;
772
773    use store_api::storage::FileId;
774
775    use super::*;
776
777    #[test]
778    fn test_serialize_instruction() {
779        let open_region = Instruction::OpenRegions(vec![OpenRegion::new(
780            RegionIdent {
781                datanode_id: 2,
782                table_id: 1024,
783                region_number: 1,
784                engine: "mito2".to_string(),
785            },
786            "test/foo",
787            HashMap::new(),
788            HashMap::new(),
789            false,
790        )]);
791
792        let serialized = serde_json::to_string(&open_region).unwrap();
793        assert_eq!(
794            r#"{"OpenRegions":[{"region_ident":{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"},"region_storage_path":"test/foo","region_options":{},"region_wal_options":{},"skip_wal_replay":false}]}"#,
795            serialized
796        );
797
798        let close_region = Instruction::CloseRegions(vec![RegionIdent {
799            datanode_id: 2,
800            table_id: 1024,
801            region_number: 1,
802            engine: "mito2".to_string(),
803        }]);
804
805        let serialized = serde_json::to_string(&close_region).unwrap();
806        assert_eq!(
807            r#"{"CloseRegions":[{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"}]}"#,
808            serialized
809        );
810
811        let upgrade_region = Instruction::UpgradeRegions(vec![UpgradeRegion {
812            region_id: RegionId::new(1024, 1),
813            last_entry_id: None,
814            metadata_last_entry_id: None,
815            replay_timeout: Duration::from_millis(1000),
816            location_id: None,
817            replay_entry_id: None,
818            metadata_replay_entry_id: None,
819        }]);
820
821        let serialized = serde_json::to_string(&upgrade_region).unwrap();
822        assert_eq!(
823            r#"{"UpgradeRegions":[{"region_id":4398046511105,"last_entry_id":null,"metadata_last_entry_id":null,"replay_timeout":"1s","location_id":null}]}"#,
824            serialized
825        );
826    }
827
828    #[test]
829    fn test_serialize_instruction_reply() {
830        let downgrade_region_reply = InstructionReply::DowngradeRegions(
831            DowngradeRegionsReply::single(DowngradeRegionReply {
832                region_id: RegionId::new(1024, 1),
833                last_entry_id: None,
834                metadata_last_entry_id: None,
835                exists: true,
836                error: None,
837            }),
838        );
839
840        let serialized = serde_json::to_string(&downgrade_region_reply).unwrap();
841        assert_eq!(
842            r#"{"type":"downgrade_regions","replies":[{"region_id":4398046511105,"last_entry_id":null,"metadata_last_entry_id":null,"exists":true,"error":null}]}"#,
843            serialized
844        );
845
846        let upgrade_region_reply =
847            InstructionReply::UpgradeRegions(UpgradeRegionsReply::single(UpgradeRegionReply {
848                region_id: RegionId::new(1024, 1),
849                ready: true,
850                exists: true,
851                error: None,
852            }));
853        let serialized = serde_json::to_string(&upgrade_region_reply).unwrap();
854        assert_eq!(
855            r#"{"type":"upgrade_regions","replies":[{"region_id":4398046511105,"ready":true,"exists":true,"error":null}]}"#,
856            serialized
857        );
858    }
859
860    #[test]
861    fn test_deserialize_instruction() {
862        // legacy open region instruction
863        let open_region_instruction = r#"{"OpenRegion":{"region_ident":{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"},"region_storage_path":"test/foo","region_options":{},"region_wal_options":{},"skip_wal_replay":false}}"#;
864        let open_region_instruction: Instruction =
865            serde_json::from_str(open_region_instruction).unwrap();
866        let open_region = Instruction::OpenRegions(vec![OpenRegion::new(
867            RegionIdent {
868                datanode_id: 2,
869                table_id: 1024,
870                region_number: 1,
871                engine: "mito2".to_string(),
872            },
873            "test/foo",
874            HashMap::new(),
875            HashMap::new(),
876            false,
877        )]);
878        assert_eq!(open_region_instruction, open_region);
879
880        // legacy close region instruction
881        let close_region_instruction = r#"{"CloseRegion":{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"}}"#;
882        let close_region_instruction: Instruction =
883            serde_json::from_str(close_region_instruction).unwrap();
884        let close_region = Instruction::CloseRegions(vec![RegionIdent {
885            datanode_id: 2,
886            table_id: 1024,
887            region_number: 1,
888            engine: "mito2".to_string(),
889        }]);
890        assert_eq!(close_region_instruction, close_region);
891
892        // legacy downgrade region instruction
893        let downgrade_region_instruction = r#"{"DowngradeRegions":{"region_id":4398046511105,"flush_timeout":{"secs":1,"nanos":0}}}"#;
894        let downgrade_region_instruction: Instruction =
895            serde_json::from_str(downgrade_region_instruction).unwrap();
896        let downgrade_region = Instruction::DowngradeRegions(vec![DowngradeRegion {
897            region_id: RegionId::new(1024, 1),
898            flush_timeout: Some(Duration::from_millis(1000)),
899        }]);
900        assert_eq!(downgrade_region_instruction, downgrade_region);
901
902        // legacy upgrade region instruction
903        let upgrade_region_instruction = r#"{"UpgradeRegion":{"region_id":4398046511105,"last_entry_id":null,"metadata_last_entry_id":null,"replay_timeout":"1s","location_id":null,"replay_entry_id":null,"metadata_replay_entry_id":null}}"#;
904        let upgrade_region_instruction: Instruction =
905            serde_json::from_str(upgrade_region_instruction).unwrap();
906        let upgrade_region = Instruction::UpgradeRegions(vec![UpgradeRegion {
907            region_id: RegionId::new(1024, 1),
908            last_entry_id: None,
909            metadata_last_entry_id: None,
910            replay_timeout: Duration::from_millis(1000),
911            location_id: None,
912            replay_entry_id: None,
913            metadata_replay_entry_id: None,
914        }]);
915        assert_eq!(upgrade_region_instruction, upgrade_region);
916    }
917
918    #[test]
919    fn test_deserialize_instruction_reply() {
920        // legacy close region reply
921        let close_region_instruction_reply =
922            r#"{"result":true,"error":null,"type":"close_region"}"#;
923        let close_region_instruction_reply: InstructionReply =
924            serde_json::from_str(close_region_instruction_reply).unwrap();
925        let close_region_reply = InstructionReply::CloseRegions(SimpleReply {
926            result: true,
927            error: None,
928        });
929        assert_eq!(close_region_instruction_reply, close_region_reply);
930
931        // legacy open region reply
932        let open_region_instruction_reply = r#"{"result":true,"error":null,"type":"open_region"}"#;
933        let open_region_instruction_reply: InstructionReply =
934            serde_json::from_str(open_region_instruction_reply).unwrap();
935        let open_region_reply = InstructionReply::OpenRegions(SimpleReply {
936            result: true,
937            error: None,
938        });
939        assert_eq!(open_region_instruction_reply, open_region_reply);
940
941        // legacy downgrade region reply
942        let downgrade_region_instruction_reply = r#"{"region_id":4398046511105,"last_entry_id":null,"metadata_last_entry_id":null,"exists":true,"error":null,"type":"downgrade_region"}"#;
943        let downgrade_region_instruction_reply: InstructionReply =
944            serde_json::from_str(downgrade_region_instruction_reply).unwrap();
945        let downgrade_region_reply = InstructionReply::DowngradeRegions(
946            DowngradeRegionsReply::single(DowngradeRegionReply {
947                region_id: RegionId::new(1024, 1),
948                last_entry_id: None,
949                metadata_last_entry_id: None,
950                exists: true,
951                error: None,
952            }),
953        );
954        assert_eq!(downgrade_region_instruction_reply, downgrade_region_reply);
955
956        // legacy upgrade region reply
957        let upgrade_region_instruction_reply = r#"{"region_id":4398046511105,"ready":true,"exists":true,"error":null,"type":"upgrade_region"}"#;
958        let upgrade_region_instruction_reply: InstructionReply =
959            serde_json::from_str(upgrade_region_instruction_reply).unwrap();
960        let upgrade_region_reply =
961            InstructionReply::UpgradeRegions(UpgradeRegionsReply::single(UpgradeRegionReply {
962                region_id: RegionId::new(1024, 1),
963                ready: true,
964                exists: true,
965                error: None,
966            }));
967        assert_eq!(upgrade_region_instruction_reply, upgrade_region_reply);
968    }
969
970    #[derive(Debug, Clone, Serialize, Deserialize)]
971    struct LegacyOpenRegion {
972        region_ident: RegionIdent,
973        region_storage_path: String,
974        region_options: HashMap<String, String>,
975    }
976
977    #[test]
978    fn test_compatible_serialize_open_region() {
979        let region_ident = RegionIdent {
980            datanode_id: 2,
981            table_id: 1024,
982            region_number: 1,
983            engine: "mito2".to_string(),
984        };
985        let region_storage_path = "test/foo".to_string();
986        let region_options = HashMap::from([
987            ("a".to_string(), "aa".to_string()),
988            ("b".to_string(), "bb".to_string()),
989        ]);
990
991        // Serialize a legacy OpenRegion.
992        let legacy_open_region = LegacyOpenRegion {
993            region_ident: region_ident.clone(),
994            region_storage_path: region_storage_path.clone(),
995            region_options: region_options.clone(),
996        };
997        let serialized = serde_json::to_string(&legacy_open_region).unwrap();
998
999        // Deserialize to OpenRegion.
1000        let deserialized = serde_json::from_str(&serialized).unwrap();
1001        let expected = OpenRegion {
1002            region_ident,
1003            region_storage_path,
1004            region_options,
1005            region_wal_options: HashMap::new(),
1006            skip_wal_replay: false,
1007        };
1008        assert_eq!(expected, deserialized);
1009    }
1010
1011    #[test]
1012    fn test_flush_regions_creation() {
1013        let region_id = RegionId::new(1024, 1);
1014
1015        // Single region sync flush
1016        let single_sync = FlushRegions::sync_single(region_id);
1017        assert_eq!(single_sync.region_ids, vec![region_id]);
1018        assert_eq!(single_sync.strategy, FlushStrategy::Sync);
1019        assert!(!single_sync.is_hint());
1020        assert!(single_sync.is_sync());
1021        assert_eq!(single_sync.error_strategy, FlushErrorStrategy::FailFast);
1022        assert!(single_sync.is_single_region());
1023        assert_eq!(single_sync.single_region_id(), Some(region_id));
1024
1025        // Batch async flush (hint)
1026        let region_ids = vec![RegionId::new(1024, 1), RegionId::new(1024, 2)];
1027        let batch_async = FlushRegions::async_batch(region_ids.clone());
1028        assert_eq!(batch_async.region_ids, region_ids);
1029        assert_eq!(batch_async.strategy, FlushStrategy::Async);
1030        assert!(batch_async.is_hint());
1031        assert!(!batch_async.is_sync());
1032        assert_eq!(batch_async.error_strategy, FlushErrorStrategy::TryAll);
1033        assert!(!batch_async.is_single_region());
1034        assert_eq!(batch_async.single_region_id(), None);
1035
1036        // Batch sync flush
1037        let batch_sync = FlushRegions::sync_batch(region_ids.clone(), FlushErrorStrategy::FailFast);
1038        assert_eq!(batch_sync.region_ids, region_ids);
1039        assert_eq!(batch_sync.strategy, FlushStrategy::Sync);
1040        assert!(!batch_sync.is_hint());
1041        assert!(batch_sync.is_sync());
1042        assert_eq!(batch_sync.error_strategy, FlushErrorStrategy::FailFast);
1043    }
1044
1045    #[test]
1046    fn test_flush_regions_conversion() {
1047        let region_id = RegionId::new(1024, 1);
1048
1049        let from_region_id: FlushRegions = region_id.into();
1050        assert_eq!(from_region_id.region_ids, vec![region_id]);
1051        assert_eq!(from_region_id.strategy, FlushStrategy::Sync);
1052        assert!(!from_region_id.is_hint());
1053        assert!(from_region_id.is_sync());
1054
1055        // Test default construction
1056        let flush_regions = FlushRegions {
1057            region_ids: vec![region_id],
1058            strategy: FlushStrategy::Async,
1059            error_strategy: FlushErrorStrategy::TryAll,
1060        };
1061        assert_eq!(flush_regions.region_ids, vec![region_id]);
1062        assert_eq!(flush_regions.strategy, FlushStrategy::Async);
1063        assert!(flush_regions.is_hint());
1064        assert!(!flush_regions.is_sync());
1065    }
1066
1067    #[test]
1068    fn test_flush_region_reply() {
1069        let region_id = RegionId::new(1024, 1);
1070
1071        // Successful single region reply
1072        let success_reply = FlushRegionReply::success_single(region_id);
1073        assert!(success_reply.overall_success);
1074        assert_eq!(success_reply.results.len(), 1);
1075        assert_eq!(success_reply.results[0].0, region_id);
1076        assert!(success_reply.results[0].1.is_ok());
1077
1078        // Failed single region reply
1079        let error_reply = FlushRegionReply::error_single(region_id, "test error".to_string());
1080        assert!(!error_reply.overall_success);
1081        assert_eq!(error_reply.results.len(), 1);
1082        assert_eq!(error_reply.results[0].0, region_id);
1083        assert!(error_reply.results[0].1.is_err());
1084
1085        // Batch reply
1086        let region_id2 = RegionId::new(1024, 2);
1087        let results = vec![
1088            (region_id, Ok(())),
1089            (region_id2, Err("flush failed".to_string())),
1090        ];
1091        let batch_reply = FlushRegionReply::from_results(results);
1092        assert!(!batch_reply.overall_success);
1093        assert_eq!(batch_reply.results.len(), 2);
1094
1095        // Conversion to SimpleReply
1096        let simple_reply = batch_reply.to_simple_reply();
1097        assert!(!simple_reply.result);
1098        assert!(simple_reply.error.is_some());
1099        assert!(simple_reply.error.unwrap().contains("flush failed"));
1100    }
1101
1102    #[test]
1103    fn test_serialize_flush_regions_instruction() {
1104        let region_id = RegionId::new(1024, 1);
1105        let flush_regions = FlushRegions::sync_single(region_id);
1106        let instruction = Instruction::FlushRegions(flush_regions.clone());
1107
1108        let serialized = serde_json::to_string(&instruction).unwrap();
1109        let deserialized: Instruction = serde_json::from_str(&serialized).unwrap();
1110
1111        match deserialized {
1112            Instruction::FlushRegions(fr) => {
1113                assert_eq!(fr.region_ids, vec![region_id]);
1114                assert_eq!(fr.strategy, FlushStrategy::Sync);
1115                assert_eq!(fr.error_strategy, FlushErrorStrategy::FailFast);
1116            }
1117            _ => panic!("Expected FlushRegions instruction"),
1118        }
1119    }
1120
1121    #[test]
1122    fn test_serialize_flush_regions_batch_instruction() {
1123        let region_ids = vec![RegionId::new(1024, 1), RegionId::new(1024, 2)];
1124        let flush_regions =
1125            FlushRegions::sync_batch(region_ids.clone(), FlushErrorStrategy::TryAll);
1126        let instruction = Instruction::FlushRegions(flush_regions);
1127
1128        let serialized = serde_json::to_string(&instruction).unwrap();
1129        let deserialized: Instruction = serde_json::from_str(&serialized).unwrap();
1130
1131        match deserialized {
1132            Instruction::FlushRegions(fr) => {
1133                assert_eq!(fr.region_ids, region_ids);
1134                assert_eq!(fr.strategy, FlushStrategy::Sync);
1135                assert!(!fr.is_hint());
1136                assert!(fr.is_sync());
1137                assert_eq!(fr.error_strategy, FlushErrorStrategy::TryAll);
1138            }
1139            _ => panic!("Expected FlushRegions instruction"),
1140        }
1141    }
1142
1143    #[test]
1144    fn test_serialize_get_file_refs_instruction_reply() {
1145        let mut manifest = FileRefsManifest::default();
1146        let r0 = RegionId::new(1024, 1);
1147        let r1 = RegionId::new(1024, 2);
1148        manifest
1149            .file_refs
1150            .insert(r0, HashSet::from([FileId::random()]));
1151        manifest
1152            .file_refs
1153            .insert(r1, HashSet::from([FileId::random()]));
1154        manifest.manifest_version.insert(r0, 10);
1155        manifest.manifest_version.insert(r1, 20);
1156
1157        let instruction_reply = InstructionReply::GetFileRefs(GetFileRefsReply {
1158            file_refs_manifest: manifest,
1159            success: true,
1160            error: None,
1161        });
1162
1163        let serialized = serde_json::to_string(&instruction_reply).unwrap();
1164        let deserialized = serde_json::from_str(&serialized).unwrap();
1165
1166        assert_eq!(instruction_reply, deserialized);
1167    }
1168}