common_meta/
instruction.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::fmt::{Display, Formatter};
17use std::time::Duration;
18
19use serde::{Deserialize, Serialize};
20use store_api::storage::{RegionId, RegionNumber};
21use strum::Display;
22use table::metadata::TableId;
23use table::table_name::TableName;
24
25use crate::flow_name::FlowName;
26use crate::key::schema_name::SchemaName;
27use crate::key::{FlowId, FlowPartitionId};
28use crate::peer::Peer;
29use crate::{DatanodeId, FlownodeId};
30
31#[derive(Eq, Hash, PartialEq, Clone, Debug, Serialize, Deserialize)]
32pub struct RegionIdent {
33    pub datanode_id: DatanodeId,
34    pub table_id: TableId,
35    pub region_number: RegionNumber,
36    pub engine: String,
37}
38
39impl RegionIdent {
40    pub fn get_region_id(&self) -> RegionId {
41        RegionId::new(self.table_id, self.region_number)
42    }
43}
44
45impl Display for RegionIdent {
46    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
47        write!(
48            f,
49            "RegionIdent(datanode_id='{}', table_id={}, region_number={}, engine = {})",
50            self.datanode_id, self.table_id, self.region_number, self.engine
51        )
52    }
53}
54
55/// The result of downgrade leader region.
56#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
57pub struct DowngradeRegionReply {
58    /// Returns the `last_entry_id` if available.
59    pub last_entry_id: Option<u64>,
60    /// Returns the `metadata_last_entry_id` if available (Only available for metric engine).
61    pub metadata_last_entry_id: Option<u64>,
62    /// Indicates whether the region exists.
63    pub exists: bool,
64    /// Return error if any during the operation.
65    pub error: Option<String>,
66}
67
68impl Display for DowngradeRegionReply {
69    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
70        write!(
71            f,
72            "(last_entry_id={:?}, exists={}, error={:?})",
73            self.last_entry_id, self.exists, self.error
74        )
75    }
76}
77
78#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
79pub struct SimpleReply {
80    pub result: bool,
81    pub error: Option<String>,
82}
83
84/// Reply for flush region operations with support for batch results.
85#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
86pub struct FlushRegionReply {
87    /// Results for each region that was attempted to be flushed.
88    /// For single region flushes, this will contain one result.
89    /// For batch flushes, this contains results for all attempted regions.
90    pub results: Vec<(RegionId, Result<(), String>)>,
91    /// Overall success: true if all regions were flushed successfully.
92    pub overall_success: bool,
93}
94
95impl FlushRegionReply {
96    /// Create a successful single region reply.
97    pub fn success_single(region_id: RegionId) -> Self {
98        Self {
99            results: vec![(region_id, Ok(()))],
100            overall_success: true,
101        }
102    }
103
104    /// Create a failed single region reply.
105    pub fn error_single(region_id: RegionId, error: String) -> Self {
106        Self {
107            results: vec![(region_id, Err(error))],
108            overall_success: false,
109        }
110    }
111
112    /// Create a batch reply from individual results.
113    pub fn from_results(results: Vec<(RegionId, Result<(), String>)>) -> Self {
114        let overall_success = results.iter().all(|(_, result)| result.is_ok());
115        Self {
116            results,
117            overall_success,
118        }
119    }
120
121    /// Convert to SimpleReply for backward compatibility.
122    pub fn to_simple_reply(&self) -> SimpleReply {
123        if self.overall_success {
124            SimpleReply {
125                result: true,
126                error: None,
127            }
128        } else {
129            let errors: Vec<String> = self
130                .results
131                .iter()
132                .filter_map(|(region_id, result)| {
133                    result
134                        .as_ref()
135                        .err()
136                        .map(|err| format!("{}: {}", region_id, err))
137                })
138                .collect();
139            SimpleReply {
140                result: false,
141                error: Some(errors.join("; ")),
142            }
143        }
144    }
145}
146
147impl Display for SimpleReply {
148    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
149        write!(f, "(result={}, error={:?})", self.result, self.error)
150    }
151}
152
153impl Display for FlushRegionReply {
154    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
155        let results_str = self
156            .results
157            .iter()
158            .map(|(region_id, result)| match result {
159                Ok(()) => format!("{}:OK", region_id),
160                Err(err) => format!("{}:ERR({})", region_id, err),
161            })
162            .collect::<Vec<_>>()
163            .join(", ");
164        write!(
165            f,
166            "(overall_success={}, results=[{}])",
167            self.overall_success, results_str
168        )
169    }
170}
171
172impl Display for OpenRegion {
173    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
174        write!(
175            f,
176            "OpenRegion(region_ident={}, region_storage_path={})",
177            self.region_ident, self.region_storage_path
178        )
179    }
180}
181
182#[serde_with::serde_as]
183#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
184pub struct OpenRegion {
185    pub region_ident: RegionIdent,
186    pub region_storage_path: String,
187    pub region_options: HashMap<String, String>,
188    #[serde(default)]
189    #[serde_as(as = "HashMap<serde_with::DisplayFromStr, _>")]
190    pub region_wal_options: HashMap<RegionNumber, String>,
191    #[serde(default)]
192    pub skip_wal_replay: bool,
193}
194
195impl OpenRegion {
196    pub fn new(
197        region_ident: RegionIdent,
198        path: &str,
199        region_options: HashMap<String, String>,
200        region_wal_options: HashMap<RegionNumber, String>,
201        skip_wal_replay: bool,
202    ) -> Self {
203        Self {
204            region_ident,
205            region_storage_path: path.to_string(),
206            region_options,
207            region_wal_options,
208            skip_wal_replay,
209        }
210    }
211}
212
213/// The instruction of downgrading leader region.
214#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
215pub struct DowngradeRegion {
216    /// The [RegionId].
217    pub region_id: RegionId,
218    /// The timeout of waiting for flush the region.
219    ///
220    /// `None` stands for don't flush before downgrading the region.
221    #[serde(default)]
222    pub flush_timeout: Option<Duration>,
223}
224
225impl Display for DowngradeRegion {
226    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
227        write!(
228            f,
229            "DowngradeRegion(region_id={}, flush_timeout={:?})",
230            self.region_id, self.flush_timeout,
231        )
232    }
233}
234
235/// Upgrades a follower region to leader region.
236#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
237pub struct UpgradeRegion {
238    /// The [RegionId].
239    pub region_id: RegionId,
240    /// The `last_entry_id` of old leader region.
241    pub last_entry_id: Option<u64>,
242    /// The `last_entry_id` of old leader metadata region (Only used for metric engine).
243    pub metadata_last_entry_id: Option<u64>,
244    /// The timeout of waiting for a wal replay.
245    ///
246    /// `None` stands for no wait,
247    /// it's helpful to verify whether the leader region is ready.
248    #[serde(with = "humantime_serde")]
249    pub replay_timeout: Option<Duration>,
250    /// The hint for replaying memtable.
251    #[serde(default)]
252    pub location_id: Option<u64>,
253    #[serde(default, skip_serializing_if = "Option::is_none")]
254    pub replay_entry_id: Option<u64>,
255    #[serde(default, skip_serializing_if = "Option::is_none")]
256    pub metadata_replay_entry_id: Option<u64>,
257}
258
259impl UpgradeRegion {
260    /// Sets the replay entry id.
261    pub fn with_replay_entry_id(mut self, replay_entry_id: Option<u64>) -> Self {
262        self.replay_entry_id = replay_entry_id;
263        self
264    }
265
266    /// Sets the metadata replay entry id.
267    pub fn with_metadata_replay_entry_id(mut self, metadata_replay_entry_id: Option<u64>) -> Self {
268        self.metadata_replay_entry_id = metadata_replay_entry_id;
269        self
270    }
271}
272
273#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
274/// The identifier of cache.
275pub enum CacheIdent {
276    FlowId(FlowId),
277    /// Indicate change of address of flownode.
278    FlowNodeAddressChange(u64),
279    FlowName(FlowName),
280    TableId(TableId),
281    TableName(TableName),
282    SchemaName(SchemaName),
283    CreateFlow(CreateFlow),
284    DropFlow(DropFlow),
285}
286
287#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
288pub struct CreateFlow {
289    /// The unique identifier for the flow.
290    pub flow_id: FlowId,
291    pub source_table_ids: Vec<TableId>,
292    /// Mapping of flow partition to peer information
293    pub partition_to_peer_mapping: Vec<(FlowPartitionId, Peer)>,
294}
295
296#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
297pub struct DropFlow {
298    pub flow_id: FlowId,
299    pub source_table_ids: Vec<TableId>,
300    /// Mapping of flow partition to flownode id
301    pub flow_part2node_id: Vec<(FlowPartitionId, FlownodeId)>,
302}
303
304/// Strategy for executing flush operations.
305#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
306pub enum FlushStrategy {
307    /// Synchronous operation that waits for completion and expects a reply
308    #[default]
309    Sync,
310    /// Asynchronous hint operation (fire-and-forget, no reply expected)
311    Async,
312}
313
314/// Error handling strategy for batch flush operations.
315#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
316pub enum FlushErrorStrategy {
317    /// Abort on first error (fail-fast)
318    #[default]
319    FailFast,
320    /// Attempt to flush all regions and collect all errors
321    TryAll,
322}
323
324/// Unified flush instruction supporting both single and batch operations
325/// with configurable execution strategies and error handling.
326#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
327pub struct FlushRegions {
328    /// List of region IDs to flush. Can contain a single region or multiple regions.
329    pub region_ids: Vec<RegionId>,
330    /// Execution strategy: Sync (expects reply) or Async (fire-and-forget hint).
331    #[serde(default)]
332    pub strategy: FlushStrategy,
333    /// Error handling strategy for batch operations (only applies when multiple regions and sync strategy).
334    #[serde(default)]
335    pub error_strategy: FlushErrorStrategy,
336}
337
338impl FlushRegions {
339    /// Create synchronous single-region flush
340    pub fn sync_single(region_id: RegionId) -> Self {
341        Self {
342            region_ids: vec![region_id],
343            strategy: FlushStrategy::Sync,
344            error_strategy: FlushErrorStrategy::FailFast,
345        }
346    }
347
348    /// Create asynchronous batch flush (fire-and-forget)
349    pub fn async_batch(region_ids: Vec<RegionId>) -> Self {
350        Self {
351            region_ids,
352            strategy: FlushStrategy::Async,
353            error_strategy: FlushErrorStrategy::TryAll,
354        }
355    }
356
357    /// Create synchronous batch flush with error strategy
358    pub fn sync_batch(region_ids: Vec<RegionId>, error_strategy: FlushErrorStrategy) -> Self {
359        Self {
360            region_ids,
361            strategy: FlushStrategy::Sync,
362            error_strategy,
363        }
364    }
365
366    /// Check if this is a single region flush.
367    pub fn is_single_region(&self) -> bool {
368        self.region_ids.len() == 1
369    }
370
371    /// Get the single region ID if this is a single region flush.
372    pub fn single_region_id(&self) -> Option<RegionId> {
373        if self.is_single_region() {
374            self.region_ids.first().copied()
375        } else {
376            None
377        }
378    }
379
380    /// Check if this is a hint (asynchronous) operation.
381    pub fn is_hint(&self) -> bool {
382        matches!(self.strategy, FlushStrategy::Async)
383    }
384
385    /// Check if this is a synchronous operation.
386    pub fn is_sync(&self) -> bool {
387        matches!(self.strategy, FlushStrategy::Sync)
388    }
389}
390
391impl From<RegionId> for FlushRegions {
392    fn from(region_id: RegionId) -> Self {
393        Self::sync_single(region_id)
394    }
395}
396
397#[derive(Debug, Clone, Serialize, Deserialize, Display, PartialEq)]
398pub enum Instruction {
399    /// Opens a region.
400    ///
401    /// - Returns true if a specified region exists.
402    OpenRegion(OpenRegion),
403    /// Closes a region.
404    ///
405    /// - Returns true if a specified region does not exist.
406    CloseRegion(RegionIdent),
407    /// Upgrades a region.
408    UpgradeRegion(UpgradeRegion),
409    /// Downgrades a region.
410    DowngradeRegion(DowngradeRegion),
411    /// Invalidates batch cache.
412    InvalidateCaches(Vec<CacheIdent>),
413    /// Flushes regions.
414    FlushRegions(FlushRegions),
415}
416
417/// The reply of [UpgradeRegion].
418#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
419pub struct UpgradeRegionReply {
420    /// Returns true if `last_entry_id` has been replayed to the latest.
421    pub ready: bool,
422    /// Indicates whether the region exists.
423    pub exists: bool,
424    /// Returns error if any.
425    pub error: Option<String>,
426}
427
428impl Display for UpgradeRegionReply {
429    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
430        write!(
431            f,
432            "(ready={}, exists={}, error={:?})",
433            self.ready, self.exists, self.error
434        )
435    }
436}
437
438#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
439#[serde(tag = "type", rename_all = "snake_case")]
440pub enum InstructionReply {
441    OpenRegion(SimpleReply),
442    CloseRegion(SimpleReply),
443    UpgradeRegion(UpgradeRegionReply),
444    DowngradeRegion(DowngradeRegionReply),
445    FlushRegions(FlushRegionReply),
446}
447
448impl Display for InstructionReply {
449    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
450        match self {
451            Self::OpenRegion(reply) => write!(f, "InstructionReply::OpenRegion({})", reply),
452            Self::CloseRegion(reply) => write!(f, "InstructionReply::CloseRegion({})", reply),
453            Self::UpgradeRegion(reply) => write!(f, "InstructionReply::UpgradeRegion({})", reply),
454            Self::DowngradeRegion(reply) => {
455                write!(f, "InstructionReply::DowngradeRegion({})", reply)
456            }
457            Self::FlushRegions(reply) => write!(f, "InstructionReply::FlushRegions({})", reply),
458        }
459    }
460}
461
462#[cfg(test)]
463mod tests {
464    use super::*;
465
466    #[test]
467    fn test_serialize_instruction() {
468        let open_region = Instruction::OpenRegion(OpenRegion::new(
469            RegionIdent {
470                datanode_id: 2,
471                table_id: 1024,
472                region_number: 1,
473                engine: "mito2".to_string(),
474            },
475            "test/foo",
476            HashMap::new(),
477            HashMap::new(),
478            false,
479        ));
480
481        let serialized = serde_json::to_string(&open_region).unwrap();
482
483        assert_eq!(
484            r#"{"OpenRegion":{"region_ident":{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"},"region_storage_path":"test/foo","region_options":{},"region_wal_options":{},"skip_wal_replay":false}}"#,
485            serialized
486        );
487
488        let close_region = Instruction::CloseRegion(RegionIdent {
489            datanode_id: 2,
490            table_id: 1024,
491            region_number: 1,
492            engine: "mito2".to_string(),
493        });
494
495        let serialized = serde_json::to_string(&close_region).unwrap();
496
497        assert_eq!(
498            r#"{"CloseRegion":{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"}}"#,
499            serialized
500        );
501    }
502
503    #[derive(Debug, Clone, Serialize, Deserialize)]
504    struct LegacyOpenRegion {
505        region_ident: RegionIdent,
506        region_storage_path: String,
507        region_options: HashMap<String, String>,
508    }
509
510    #[test]
511    fn test_compatible_serialize_open_region() {
512        let region_ident = RegionIdent {
513            datanode_id: 2,
514            table_id: 1024,
515            region_number: 1,
516            engine: "mito2".to_string(),
517        };
518        let region_storage_path = "test/foo".to_string();
519        let region_options = HashMap::from([
520            ("a".to_string(), "aa".to_string()),
521            ("b".to_string(), "bb".to_string()),
522        ]);
523
524        // Serialize a legacy OpenRegion.
525        let legacy_open_region = LegacyOpenRegion {
526            region_ident: region_ident.clone(),
527            region_storage_path: region_storage_path.clone(),
528            region_options: region_options.clone(),
529        };
530        let serialized = serde_json::to_string(&legacy_open_region).unwrap();
531
532        // Deserialize to OpenRegion.
533        let deserialized = serde_json::from_str(&serialized).unwrap();
534        let expected = OpenRegion {
535            region_ident,
536            region_storage_path,
537            region_options,
538            region_wal_options: HashMap::new(),
539            skip_wal_replay: false,
540        };
541        assert_eq!(expected, deserialized);
542    }
543
544    #[test]
545    fn test_flush_regions_creation() {
546        let region_id = RegionId::new(1024, 1);
547
548        // Single region sync flush
549        let single_sync = FlushRegions::sync_single(region_id);
550        assert_eq!(single_sync.region_ids, vec![region_id]);
551        assert_eq!(single_sync.strategy, FlushStrategy::Sync);
552        assert!(!single_sync.is_hint());
553        assert!(single_sync.is_sync());
554        assert_eq!(single_sync.error_strategy, FlushErrorStrategy::FailFast);
555        assert!(single_sync.is_single_region());
556        assert_eq!(single_sync.single_region_id(), Some(region_id));
557
558        // Batch async flush (hint)
559        let region_ids = vec![RegionId::new(1024, 1), RegionId::new(1024, 2)];
560        let batch_async = FlushRegions::async_batch(region_ids.clone());
561        assert_eq!(batch_async.region_ids, region_ids);
562        assert_eq!(batch_async.strategy, FlushStrategy::Async);
563        assert!(batch_async.is_hint());
564        assert!(!batch_async.is_sync());
565        assert_eq!(batch_async.error_strategy, FlushErrorStrategy::TryAll);
566        assert!(!batch_async.is_single_region());
567        assert_eq!(batch_async.single_region_id(), None);
568
569        // Batch sync flush
570        let batch_sync = FlushRegions::sync_batch(region_ids.clone(), FlushErrorStrategy::FailFast);
571        assert_eq!(batch_sync.region_ids, region_ids);
572        assert_eq!(batch_sync.strategy, FlushStrategy::Sync);
573        assert!(!batch_sync.is_hint());
574        assert!(batch_sync.is_sync());
575        assert_eq!(batch_sync.error_strategy, FlushErrorStrategy::FailFast);
576    }
577
578    #[test]
579    fn test_flush_regions_conversion() {
580        let region_id = RegionId::new(1024, 1);
581
582        let from_region_id: FlushRegions = region_id.into();
583        assert_eq!(from_region_id.region_ids, vec![region_id]);
584        assert_eq!(from_region_id.strategy, FlushStrategy::Sync);
585        assert!(!from_region_id.is_hint());
586        assert!(from_region_id.is_sync());
587
588        // Test default construction
589        let flush_regions = FlushRegions {
590            region_ids: vec![region_id],
591            strategy: FlushStrategy::Async,
592            error_strategy: FlushErrorStrategy::TryAll,
593        };
594        assert_eq!(flush_regions.region_ids, vec![region_id]);
595        assert_eq!(flush_regions.strategy, FlushStrategy::Async);
596        assert!(flush_regions.is_hint());
597        assert!(!flush_regions.is_sync());
598    }
599
600    #[test]
601    fn test_flush_region_reply() {
602        let region_id = RegionId::new(1024, 1);
603
604        // Successful single region reply
605        let success_reply = FlushRegionReply::success_single(region_id);
606        assert!(success_reply.overall_success);
607        assert_eq!(success_reply.results.len(), 1);
608        assert_eq!(success_reply.results[0].0, region_id);
609        assert!(success_reply.results[0].1.is_ok());
610
611        // Failed single region reply
612        let error_reply = FlushRegionReply::error_single(region_id, "test error".to_string());
613        assert!(!error_reply.overall_success);
614        assert_eq!(error_reply.results.len(), 1);
615        assert_eq!(error_reply.results[0].0, region_id);
616        assert!(error_reply.results[0].1.is_err());
617
618        // Batch reply
619        let region_id2 = RegionId::new(1024, 2);
620        let results = vec![
621            (region_id, Ok(())),
622            (region_id2, Err("flush failed".to_string())),
623        ];
624        let batch_reply = FlushRegionReply::from_results(results);
625        assert!(!batch_reply.overall_success);
626        assert_eq!(batch_reply.results.len(), 2);
627
628        // Conversion to SimpleReply
629        let simple_reply = batch_reply.to_simple_reply();
630        assert!(!simple_reply.result);
631        assert!(simple_reply.error.is_some());
632        assert!(simple_reply.error.unwrap().contains("flush failed"));
633    }
634
635    #[test]
636    fn test_serialize_flush_regions_instruction() {
637        let region_id = RegionId::new(1024, 1);
638        let flush_regions = FlushRegions::sync_single(region_id);
639        let instruction = Instruction::FlushRegions(flush_regions.clone());
640
641        let serialized = serde_json::to_string(&instruction).unwrap();
642        let deserialized: Instruction = serde_json::from_str(&serialized).unwrap();
643
644        match deserialized {
645            Instruction::FlushRegions(fr) => {
646                assert_eq!(fr.region_ids, vec![region_id]);
647                assert_eq!(fr.strategy, FlushStrategy::Sync);
648                assert_eq!(fr.error_strategy, FlushErrorStrategy::FailFast);
649            }
650            _ => panic!("Expected FlushRegions instruction"),
651        }
652    }
653
654    #[test]
655    fn test_serialize_flush_regions_batch_instruction() {
656        let region_ids = vec![RegionId::new(1024, 1), RegionId::new(1024, 2)];
657        let flush_regions =
658            FlushRegions::sync_batch(region_ids.clone(), FlushErrorStrategy::TryAll);
659        let instruction = Instruction::FlushRegions(flush_regions);
660
661        let serialized = serde_json::to_string(&instruction).unwrap();
662        let deserialized: Instruction = serde_json::from_str(&serialized).unwrap();
663
664        match deserialized {
665            Instruction::FlushRegions(fr) => {
666                assert_eq!(fr.region_ids, region_ids);
667                assert_eq!(fr.strategy, FlushStrategy::Sync);
668                assert!(!fr.is_hint());
669                assert!(fr.is_sync());
670                assert_eq!(fr.error_strategy, FlushErrorStrategy::TryAll);
671            }
672            _ => panic!("Expected FlushRegions instruction"),
673        }
674    }
675}