common_meta/
instruction.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::fmt::{Display, Formatter};
17use std::time::Duration;
18
19use serde::{Deserialize, Serialize};
20use store_api::storage::{RegionId, RegionNumber};
21use strum::Display;
22use table::metadata::TableId;
23use table::table_name::TableName;
24
25use crate::flow_name::FlowName;
26use crate::key::schema_name::SchemaName;
27use crate::key::{FlowId, FlowPartitionId};
28use crate::peer::Peer;
29use crate::{DatanodeId, FlownodeId};
30
31#[derive(Eq, Hash, PartialEq, Clone, Debug, Serialize, Deserialize)]
32pub struct RegionIdent {
33    pub datanode_id: DatanodeId,
34    pub table_id: TableId,
35    pub region_number: RegionNumber,
36    pub engine: String,
37}
38
39impl RegionIdent {
40    pub fn get_region_id(&self) -> RegionId {
41        RegionId::new(self.table_id, self.region_number)
42    }
43}
44
45impl Display for RegionIdent {
46    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
47        write!(
48            f,
49            "RegionIdent(datanode_id='{}', table_id={}, region_number={}, engine = {})",
50            self.datanode_id, self.table_id, self.region_number, self.engine
51        )
52    }
53}
54
55/// The result of downgrade leader region.
56#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
57pub struct DowngradeRegionReply {
58    /// Returns the `last_entry_id` if available.
59    pub last_entry_id: Option<u64>,
60    /// Returns the `metadata_last_entry_id` if available (Only available for metric engine).
61    pub metadata_last_entry_id: Option<u64>,
62    /// Indicates whether the region exists.
63    pub exists: bool,
64    /// Return error if any during the operation.
65    pub error: Option<String>,
66}
67
68impl Display for DowngradeRegionReply {
69    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
70        write!(
71            f,
72            "(last_entry_id={:?}, exists={}, error={:?})",
73            self.last_entry_id, self.exists, self.error
74        )
75    }
76}
77
78#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
79pub struct SimpleReply {
80    pub result: bool,
81    pub error: Option<String>,
82}
83
84/// Reply for flush region operations with support for batch results.
85#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
86pub struct FlushRegionReply {
87    /// Results for each region that was attempted to be flushed.
88    /// For single region flushes, this will contain one result.
89    /// For batch flushes, this contains results for all attempted regions.
90    pub results: Vec<(RegionId, Result<(), String>)>,
91    /// Overall success: true if all regions were flushed successfully.
92    pub overall_success: bool,
93}
94
95impl FlushRegionReply {
96    /// Create a successful single region reply.
97    pub fn success_single(region_id: RegionId) -> Self {
98        Self {
99            results: vec![(region_id, Ok(()))],
100            overall_success: true,
101        }
102    }
103
104    /// Create a failed single region reply.
105    pub fn error_single(region_id: RegionId, error: String) -> Self {
106        Self {
107            results: vec![(region_id, Err(error))],
108            overall_success: false,
109        }
110    }
111
112    /// Create a batch reply from individual results.
113    pub fn from_results(results: Vec<(RegionId, Result<(), String>)>) -> Self {
114        let overall_success = results.iter().all(|(_, result)| result.is_ok());
115        Self {
116            results,
117            overall_success,
118        }
119    }
120
121    /// Convert to SimpleReply for backward compatibility.
122    pub fn to_simple_reply(&self) -> SimpleReply {
123        if self.overall_success {
124            SimpleReply {
125                result: true,
126                error: None,
127            }
128        } else {
129            let errors: Vec<String> = self
130                .results
131                .iter()
132                .filter_map(|(region_id, result)| {
133                    result
134                        .as_ref()
135                        .err()
136                        .map(|err| format!("{}: {}", region_id, err))
137                })
138                .collect();
139            SimpleReply {
140                result: false,
141                error: Some(errors.join("; ")),
142            }
143        }
144    }
145}
146
147impl Display for SimpleReply {
148    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
149        write!(f, "(result={}, error={:?})", self.result, self.error)
150    }
151}
152
153impl Display for FlushRegionReply {
154    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
155        let results_str = self
156            .results
157            .iter()
158            .map(|(region_id, result)| match result {
159                Ok(()) => format!("{}:OK", region_id),
160                Err(err) => format!("{}:ERR({})", region_id, err),
161            })
162            .collect::<Vec<_>>()
163            .join(", ");
164        write!(
165            f,
166            "(overall_success={}, results=[{}])",
167            self.overall_success, results_str
168        )
169    }
170}
171
172impl Display for OpenRegion {
173    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
174        write!(
175            f,
176            "OpenRegion(region_ident={}, region_storage_path={})",
177            self.region_ident, self.region_storage_path
178        )
179    }
180}
181
182#[serde_with::serde_as]
183#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
184pub struct OpenRegion {
185    pub region_ident: RegionIdent,
186    pub region_storage_path: String,
187    pub region_options: HashMap<String, String>,
188    #[serde(default)]
189    #[serde_as(as = "HashMap<serde_with::DisplayFromStr, _>")]
190    pub region_wal_options: HashMap<RegionNumber, String>,
191    #[serde(default)]
192    pub skip_wal_replay: bool,
193}
194
195impl OpenRegion {
196    pub fn new(
197        region_ident: RegionIdent,
198        path: &str,
199        region_options: HashMap<String, String>,
200        region_wal_options: HashMap<RegionNumber, String>,
201        skip_wal_replay: bool,
202    ) -> Self {
203        Self {
204            region_ident,
205            region_storage_path: path.to_string(),
206            region_options,
207            region_wal_options,
208            skip_wal_replay,
209        }
210    }
211}
212
213/// The instruction of downgrading leader region.
214#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
215pub struct DowngradeRegion {
216    /// The [RegionId].
217    pub region_id: RegionId,
218    /// The timeout of waiting for flush the region.
219    ///
220    /// `None` stands for don't flush before downgrading the region.
221    #[serde(default)]
222    pub flush_timeout: Option<Duration>,
223}
224
225impl Display for DowngradeRegion {
226    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
227        write!(
228            f,
229            "DowngradeRegion(region_id={}, flush_timeout={:?})",
230            self.region_id, self.flush_timeout,
231        )
232    }
233}
234
235/// Upgrades a follower region to leader region.
236#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
237pub struct UpgradeRegion {
238    /// The [RegionId].
239    pub region_id: RegionId,
240    /// The `last_entry_id` of old leader region.
241    pub last_entry_id: Option<u64>,
242    /// The `last_entry_id` of old leader metadata region (Only used for metric engine).
243    pub metadata_last_entry_id: Option<u64>,
244    /// The timeout of waiting for a wal replay.
245    ///
246    /// `None` stands for no wait,
247    /// it's helpful to verify whether the leader region is ready.
248    #[serde(with = "humantime_serde")]
249    pub replay_timeout: Option<Duration>,
250    /// The hint for replaying memtable.
251    #[serde(default)]
252    pub location_id: Option<u64>,
253    #[serde(default, skip_serializing_if = "Option::is_none")]
254    pub replay_entry_id: Option<u64>,
255    #[serde(default, skip_serializing_if = "Option::is_none")]
256    pub metadata_replay_entry_id: Option<u64>,
257}
258
259impl UpgradeRegion {
260    /// Sets the replay entry id.
261    pub fn with_replay_entry_id(mut self, replay_entry_id: Option<u64>) -> Self {
262        self.replay_entry_id = replay_entry_id;
263        self
264    }
265
266    /// Sets the metadata replay entry id.
267    pub fn with_metadata_replay_entry_id(mut self, metadata_replay_entry_id: Option<u64>) -> Self {
268        self.metadata_replay_entry_id = metadata_replay_entry_id;
269        self
270    }
271}
272
273#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
274/// The identifier of cache.
275pub enum CacheIdent {
276    FlowId(FlowId),
277    /// Indicate change of address of flownode.
278    FlowNodeAddressChange(u64),
279    FlowName(FlowName),
280    TableId(TableId),
281    TableName(TableName),
282    SchemaName(SchemaName),
283    CreateFlow(CreateFlow),
284    DropFlow(DropFlow),
285}
286
287#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
288pub struct CreateFlow {
289    /// The unique identifier for the flow.
290    pub flow_id: FlowId,
291    pub source_table_ids: Vec<TableId>,
292    /// Mapping of flow partition to peer information
293    pub partition_to_peer_mapping: Vec<(FlowPartitionId, Peer)>,
294}
295
296#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
297pub struct DropFlow {
298    pub flow_id: FlowId,
299    pub source_table_ids: Vec<TableId>,
300    /// Mapping of flow partition to flownode id
301    pub flow_part2node_id: Vec<(FlowPartitionId, FlownodeId)>,
302}
303
304/// Strategy for executing flush operations.
305#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
306pub enum FlushStrategy {
307    /// Synchronous operation that waits for completion and expects a reply
308    Sync,
309    /// Asynchronous hint operation (fire-and-forget, no reply expected)
310    Async,
311}
312
313/// Error handling strategy for batch flush operations.
314#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
315pub enum FlushErrorStrategy {
316    /// Abort on first error (fail-fast)
317    FailFast,
318    /// Attempt to flush all regions and collect all errors
319    TryAll,
320}
321
322impl Default for FlushStrategy {
323    fn default() -> Self {
324        Self::Sync
325    }
326}
327
328impl Default for FlushErrorStrategy {
329    fn default() -> Self {
330        Self::FailFast
331    }
332}
333
334/// Unified flush instruction supporting both single and batch operations
335/// with configurable execution strategies and error handling.
336#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
337pub struct FlushRegions {
338    /// List of region IDs to flush. Can contain a single region or multiple regions.
339    pub region_ids: Vec<RegionId>,
340    /// Execution strategy: Sync (expects reply) or Async (fire-and-forget hint).
341    #[serde(default)]
342    pub strategy: FlushStrategy,
343    /// Error handling strategy for batch operations (only applies when multiple regions and sync strategy).
344    #[serde(default)]
345    pub error_strategy: FlushErrorStrategy,
346}
347
348impl FlushRegions {
349    /// Create synchronous single-region flush
350    pub fn sync_single(region_id: RegionId) -> Self {
351        Self {
352            region_ids: vec![region_id],
353            strategy: FlushStrategy::Sync,
354            error_strategy: FlushErrorStrategy::FailFast,
355        }
356    }
357
358    /// Create asynchronous batch flush (fire-and-forget)
359    pub fn async_batch(region_ids: Vec<RegionId>) -> Self {
360        Self {
361            region_ids,
362            strategy: FlushStrategy::Async,
363            error_strategy: FlushErrorStrategy::TryAll,
364        }
365    }
366
367    /// Create synchronous batch flush with error strategy
368    pub fn sync_batch(region_ids: Vec<RegionId>, error_strategy: FlushErrorStrategy) -> Self {
369        Self {
370            region_ids,
371            strategy: FlushStrategy::Sync,
372            error_strategy,
373        }
374    }
375
376    /// Check if this is a single region flush.
377    pub fn is_single_region(&self) -> bool {
378        self.region_ids.len() == 1
379    }
380
381    /// Get the single region ID if this is a single region flush.
382    pub fn single_region_id(&self) -> Option<RegionId> {
383        if self.is_single_region() {
384            self.region_ids.first().copied()
385        } else {
386            None
387        }
388    }
389
390    /// Check if this is a hint (asynchronous) operation.
391    pub fn is_hint(&self) -> bool {
392        matches!(self.strategy, FlushStrategy::Async)
393    }
394
395    /// Check if this is a synchronous operation.
396    pub fn is_sync(&self) -> bool {
397        matches!(self.strategy, FlushStrategy::Sync)
398    }
399}
400
401impl From<RegionId> for FlushRegions {
402    fn from(region_id: RegionId) -> Self {
403        Self::sync_single(region_id)
404    }
405}
406
407#[derive(Debug, Clone, Serialize, Deserialize, Display, PartialEq)]
408pub enum Instruction {
409    /// Opens a region.
410    ///
411    /// - Returns true if a specified region exists.
412    OpenRegion(OpenRegion),
413    /// Closes a region.
414    ///
415    /// - Returns true if a specified region does not exist.
416    CloseRegion(RegionIdent),
417    /// Upgrades a region.
418    UpgradeRegion(UpgradeRegion),
419    /// Downgrades a region.
420    DowngradeRegion(DowngradeRegion),
421    /// Invalidates batch cache.
422    InvalidateCaches(Vec<CacheIdent>),
423    /// Flushes regions.
424    FlushRegions(FlushRegions),
425}
426
427/// The reply of [UpgradeRegion].
428#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
429pub struct UpgradeRegionReply {
430    /// Returns true if `last_entry_id` has been replayed to the latest.
431    pub ready: bool,
432    /// Indicates whether the region exists.
433    pub exists: bool,
434    /// Returns error if any.
435    pub error: Option<String>,
436}
437
438impl Display for UpgradeRegionReply {
439    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
440        write!(
441            f,
442            "(ready={}, exists={}, error={:?})",
443            self.ready, self.exists, self.error
444        )
445    }
446}
447
448#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
449#[serde(tag = "type", rename_all = "snake_case")]
450pub enum InstructionReply {
451    OpenRegion(SimpleReply),
452    CloseRegion(SimpleReply),
453    UpgradeRegion(UpgradeRegionReply),
454    DowngradeRegion(DowngradeRegionReply),
455    FlushRegions(FlushRegionReply),
456}
457
458impl Display for InstructionReply {
459    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
460        match self {
461            Self::OpenRegion(reply) => write!(f, "InstructionReply::OpenRegion({})", reply),
462            Self::CloseRegion(reply) => write!(f, "InstructionReply::CloseRegion({})", reply),
463            Self::UpgradeRegion(reply) => write!(f, "InstructionReply::UpgradeRegion({})", reply),
464            Self::DowngradeRegion(reply) => {
465                write!(f, "InstructionReply::DowngradeRegion({})", reply)
466            }
467            Self::FlushRegions(reply) => write!(f, "InstructionReply::FlushRegions({})", reply),
468        }
469    }
470}
471
472#[cfg(test)]
473mod tests {
474    use super::*;
475
476    #[test]
477    fn test_serialize_instruction() {
478        let open_region = Instruction::OpenRegion(OpenRegion::new(
479            RegionIdent {
480                datanode_id: 2,
481                table_id: 1024,
482                region_number: 1,
483                engine: "mito2".to_string(),
484            },
485            "test/foo",
486            HashMap::new(),
487            HashMap::new(),
488            false,
489        ));
490
491        let serialized = serde_json::to_string(&open_region).unwrap();
492
493        assert_eq!(
494            r#"{"OpenRegion":{"region_ident":{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"},"region_storage_path":"test/foo","region_options":{},"region_wal_options":{},"skip_wal_replay":false}}"#,
495            serialized
496        );
497
498        let close_region = Instruction::CloseRegion(RegionIdent {
499            datanode_id: 2,
500            table_id: 1024,
501            region_number: 1,
502            engine: "mito2".to_string(),
503        });
504
505        let serialized = serde_json::to_string(&close_region).unwrap();
506
507        assert_eq!(
508            r#"{"CloseRegion":{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"}}"#,
509            serialized
510        );
511    }
512
513    #[derive(Debug, Clone, Serialize, Deserialize)]
514    struct LegacyOpenRegion {
515        region_ident: RegionIdent,
516        region_storage_path: String,
517        region_options: HashMap<String, String>,
518    }
519
520    #[test]
521    fn test_compatible_serialize_open_region() {
522        let region_ident = RegionIdent {
523            datanode_id: 2,
524            table_id: 1024,
525            region_number: 1,
526            engine: "mito2".to_string(),
527        };
528        let region_storage_path = "test/foo".to_string();
529        let region_options = HashMap::from([
530            ("a".to_string(), "aa".to_string()),
531            ("b".to_string(), "bb".to_string()),
532        ]);
533
534        // Serialize a legacy OpenRegion.
535        let legacy_open_region = LegacyOpenRegion {
536            region_ident: region_ident.clone(),
537            region_storage_path: region_storage_path.clone(),
538            region_options: region_options.clone(),
539        };
540        let serialized = serde_json::to_string(&legacy_open_region).unwrap();
541
542        // Deserialize to OpenRegion.
543        let deserialized = serde_json::from_str(&serialized).unwrap();
544        let expected = OpenRegion {
545            region_ident,
546            region_storage_path,
547            region_options,
548            region_wal_options: HashMap::new(),
549            skip_wal_replay: false,
550        };
551        assert_eq!(expected, deserialized);
552    }
553
554    #[test]
555    fn test_flush_regions_creation() {
556        let region_id = RegionId::new(1024, 1);
557
558        // Single region sync flush
559        let single_sync = FlushRegions::sync_single(region_id);
560        assert_eq!(single_sync.region_ids, vec![region_id]);
561        assert_eq!(single_sync.strategy, FlushStrategy::Sync);
562        assert!(!single_sync.is_hint());
563        assert!(single_sync.is_sync());
564        assert_eq!(single_sync.error_strategy, FlushErrorStrategy::FailFast);
565        assert!(single_sync.is_single_region());
566        assert_eq!(single_sync.single_region_id(), Some(region_id));
567
568        // Batch async flush (hint)
569        let region_ids = vec![RegionId::new(1024, 1), RegionId::new(1024, 2)];
570        let batch_async = FlushRegions::async_batch(region_ids.clone());
571        assert_eq!(batch_async.region_ids, region_ids);
572        assert_eq!(batch_async.strategy, FlushStrategy::Async);
573        assert!(batch_async.is_hint());
574        assert!(!batch_async.is_sync());
575        assert_eq!(batch_async.error_strategy, FlushErrorStrategy::TryAll);
576        assert!(!batch_async.is_single_region());
577        assert_eq!(batch_async.single_region_id(), None);
578
579        // Batch sync flush
580        let batch_sync = FlushRegions::sync_batch(region_ids.clone(), FlushErrorStrategy::FailFast);
581        assert_eq!(batch_sync.region_ids, region_ids);
582        assert_eq!(batch_sync.strategy, FlushStrategy::Sync);
583        assert!(!batch_sync.is_hint());
584        assert!(batch_sync.is_sync());
585        assert_eq!(batch_sync.error_strategy, FlushErrorStrategy::FailFast);
586    }
587
588    #[test]
589    fn test_flush_regions_conversion() {
590        let region_id = RegionId::new(1024, 1);
591
592        let from_region_id: FlushRegions = region_id.into();
593        assert_eq!(from_region_id.region_ids, vec![region_id]);
594        assert_eq!(from_region_id.strategy, FlushStrategy::Sync);
595        assert!(!from_region_id.is_hint());
596        assert!(from_region_id.is_sync());
597
598        // Test default construction
599        let flush_regions = FlushRegions {
600            region_ids: vec![region_id],
601            strategy: FlushStrategy::Async,
602            error_strategy: FlushErrorStrategy::TryAll,
603        };
604        assert_eq!(flush_regions.region_ids, vec![region_id]);
605        assert_eq!(flush_regions.strategy, FlushStrategy::Async);
606        assert!(flush_regions.is_hint());
607        assert!(!flush_regions.is_sync());
608    }
609
610    #[test]
611    fn test_flush_region_reply() {
612        let region_id = RegionId::new(1024, 1);
613
614        // Successful single region reply
615        let success_reply = FlushRegionReply::success_single(region_id);
616        assert!(success_reply.overall_success);
617        assert_eq!(success_reply.results.len(), 1);
618        assert_eq!(success_reply.results[0].0, region_id);
619        assert!(success_reply.results[0].1.is_ok());
620
621        // Failed single region reply
622        let error_reply = FlushRegionReply::error_single(region_id, "test error".to_string());
623        assert!(!error_reply.overall_success);
624        assert_eq!(error_reply.results.len(), 1);
625        assert_eq!(error_reply.results[0].0, region_id);
626        assert!(error_reply.results[0].1.is_err());
627
628        // Batch reply
629        let region_id2 = RegionId::new(1024, 2);
630        let results = vec![
631            (region_id, Ok(())),
632            (region_id2, Err("flush failed".to_string())),
633        ];
634        let batch_reply = FlushRegionReply::from_results(results);
635        assert!(!batch_reply.overall_success);
636        assert_eq!(batch_reply.results.len(), 2);
637
638        // Conversion to SimpleReply
639        let simple_reply = batch_reply.to_simple_reply();
640        assert!(!simple_reply.result);
641        assert!(simple_reply.error.is_some());
642        assert!(simple_reply.error.unwrap().contains("flush failed"));
643    }
644
645    #[test]
646    fn test_serialize_flush_regions_instruction() {
647        let region_id = RegionId::new(1024, 1);
648        let flush_regions = FlushRegions::sync_single(region_id);
649        let instruction = Instruction::FlushRegions(flush_regions.clone());
650
651        let serialized = serde_json::to_string(&instruction).unwrap();
652        let deserialized: Instruction = serde_json::from_str(&serialized).unwrap();
653
654        match deserialized {
655            Instruction::FlushRegions(fr) => {
656                assert_eq!(fr.region_ids, vec![region_id]);
657                assert_eq!(fr.strategy, FlushStrategy::Sync);
658                assert_eq!(fr.error_strategy, FlushErrorStrategy::FailFast);
659            }
660            _ => panic!("Expected FlushRegions instruction"),
661        }
662    }
663
664    #[test]
665    fn test_serialize_flush_regions_batch_instruction() {
666        let region_ids = vec![RegionId::new(1024, 1), RegionId::new(1024, 2)];
667        let flush_regions =
668            FlushRegions::sync_batch(region_ids.clone(), FlushErrorStrategy::TryAll);
669        let instruction = Instruction::FlushRegions(flush_regions);
670
671        let serialized = serde_json::to_string(&instruction).unwrap();
672        let deserialized: Instruction = serde_json::from_str(&serialized).unwrap();
673
674        match deserialized {
675            Instruction::FlushRegions(fr) => {
676                assert_eq!(fr.region_ids, region_ids);
677                assert_eq!(fr.strategy, FlushStrategy::Sync);
678                assert!(!fr.is_hint());
679                assert!(fr.is_sync());
680                assert_eq!(fr.error_strategy, FlushErrorStrategy::TryAll);
681            }
682            _ => panic!("Expected FlushRegions instruction"),
683        }
684    }
685}