common_meta/
instruction.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::fmt::{Display, Formatter};
17use std::time::Duration;
18
19use serde::{Deserialize, Serialize};
20use store_api::storage::{RegionId, RegionNumber};
21use strum::Display;
22use table::metadata::TableId;
23use table::table_name::TableName;
24
25use crate::flow_name::FlowName;
26use crate::key::schema_name::SchemaName;
27use crate::key::FlowId;
28use crate::peer::Peer;
29use crate::{DatanodeId, FlownodeId};
30
31#[derive(Eq, Hash, PartialEq, Clone, Debug, Serialize, Deserialize)]
32pub struct RegionIdent {
33    pub datanode_id: DatanodeId,
34    pub table_id: TableId,
35    pub region_number: RegionNumber,
36    pub engine: String,
37}
38
39impl RegionIdent {
40    pub fn get_region_id(&self) -> RegionId {
41        RegionId::new(self.table_id, self.region_number)
42    }
43}
44
45impl Display for RegionIdent {
46    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
47        write!(
48            f,
49            "RegionIdent(datanode_id='{}', table_id={}, region_number={}, engine = {})",
50            self.datanode_id, self.table_id, self.region_number, self.engine
51        )
52    }
53}
54
55/// The result of downgrade leader region.
56#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
57pub struct DowngradeRegionReply {
58    /// Returns the `last_entry_id` if available.
59    pub last_entry_id: Option<u64>,
60    /// Returns the `metadata_last_entry_id` if available (Only available for metric engine).
61    pub metadata_last_entry_id: Option<u64>,
62    /// Indicates whether the region exists.
63    pub exists: bool,
64    /// Return error if any during the operation.
65    pub error: Option<String>,
66}
67
68impl Display for DowngradeRegionReply {
69    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
70        write!(
71            f,
72            "(last_entry_id={:?}, exists={}, error={:?})",
73            self.last_entry_id, self.exists, self.error
74        )
75    }
76}
77
78#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
79pub struct SimpleReply {
80    pub result: bool,
81    pub error: Option<String>,
82}
83
84impl Display for SimpleReply {
85    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
86        write!(f, "(result={}, error={:?})", self.result, self.error)
87    }
88}
89
90impl Display for OpenRegion {
91    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
92        write!(
93            f,
94            "OpenRegion(region_ident={}, region_storage_path={})",
95            self.region_ident, self.region_storage_path
96        )
97    }
98}
99
100#[serde_with::serde_as]
101#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
102pub struct OpenRegion {
103    pub region_ident: RegionIdent,
104    pub region_storage_path: String,
105    pub region_options: HashMap<String, String>,
106    #[serde(default)]
107    #[serde_as(as = "HashMap<serde_with::DisplayFromStr, _>")]
108    pub region_wal_options: HashMap<RegionNumber, String>,
109    #[serde(default)]
110    pub skip_wal_replay: bool,
111}
112
113impl OpenRegion {
114    pub fn new(
115        region_ident: RegionIdent,
116        path: &str,
117        region_options: HashMap<String, String>,
118        region_wal_options: HashMap<RegionNumber, String>,
119        skip_wal_replay: bool,
120    ) -> Self {
121        Self {
122            region_ident,
123            region_storage_path: path.to_string(),
124            region_options,
125            region_wal_options,
126            skip_wal_replay,
127        }
128    }
129}
130
131/// The instruction of downgrading leader region.
132#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
133pub struct DowngradeRegion {
134    /// The [RegionId].
135    pub region_id: RegionId,
136    /// The timeout of waiting for flush the region.
137    ///
138    /// `None` stands for don't flush before downgrading the region.
139    #[serde(default)]
140    pub flush_timeout: Option<Duration>,
141}
142
143impl Display for DowngradeRegion {
144    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
145        write!(
146            f,
147            "DowngradeRegion(region_id={}, flush_timeout={:?})",
148            self.region_id, self.flush_timeout,
149        )
150    }
151}
152
153/// Upgrades a follower region to leader region.
154#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
155pub struct UpgradeRegion {
156    /// The [RegionId].
157    pub region_id: RegionId,
158    /// The `last_entry_id` of old leader region.
159    pub last_entry_id: Option<u64>,
160    /// The `last_entry_id` of old leader metadata region (Only used for metric engine).
161    pub metadata_last_entry_id: Option<u64>,
162    /// The timeout of waiting for a wal replay.
163    ///
164    /// `None` stands for no wait,
165    /// it's helpful to verify whether the leader region is ready.
166    #[serde(with = "humantime_serde")]
167    pub replay_timeout: Option<Duration>,
168    /// The hint for replaying memtable.
169    #[serde(default)]
170    pub location_id: Option<u64>,
171}
172
173#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
174/// The identifier of cache.
175pub enum CacheIdent {
176    FlowId(FlowId),
177    FlowName(FlowName),
178    TableId(TableId),
179    TableName(TableName),
180    SchemaName(SchemaName),
181    CreateFlow(CreateFlow),
182    DropFlow(DropFlow),
183}
184
185#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
186pub struct CreateFlow {
187    pub source_table_ids: Vec<TableId>,
188    pub flownodes: Vec<Peer>,
189}
190
191#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
192pub struct DropFlow {
193    pub source_table_ids: Vec<TableId>,
194    pub flownode_ids: Vec<FlownodeId>,
195}
196
197/// Flushes a batch of regions.
198#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
199pub struct FlushRegions {
200    pub region_ids: Vec<RegionId>,
201}
202
203#[derive(Debug, Clone, Serialize, Deserialize, Display, PartialEq)]
204pub enum Instruction {
205    /// Opens a region.
206    ///
207    /// - Returns true if a specified region exists.
208    OpenRegion(OpenRegion),
209    /// Closes a region.
210    ///
211    /// - Returns true if a specified region does not exist.
212    CloseRegion(RegionIdent),
213    /// Upgrades a region.
214    UpgradeRegion(UpgradeRegion),
215    /// Downgrades a region.
216    DowngradeRegion(DowngradeRegion),
217    /// Invalidates batch cache.
218    InvalidateCaches(Vec<CacheIdent>),
219    /// Flushes regions.
220    FlushRegions(FlushRegions),
221    /// Flushes a single region.
222    FlushRegion(RegionId),
223}
224
225/// The reply of [UpgradeRegion].
226#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
227pub struct UpgradeRegionReply {
228    /// Returns true if `last_entry_id` has been replayed to the latest.
229    pub ready: bool,
230    /// Indicates whether the region exists.
231    pub exists: bool,
232    /// Returns error if any.
233    pub error: Option<String>,
234}
235
236impl Display for UpgradeRegionReply {
237    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
238        write!(
239            f,
240            "(ready={}, exists={}, error={:?})",
241            self.ready, self.exists, self.error
242        )
243    }
244}
245
246#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
247#[serde(tag = "type", rename_all = "snake_case")]
248pub enum InstructionReply {
249    OpenRegion(SimpleReply),
250    CloseRegion(SimpleReply),
251    UpgradeRegion(UpgradeRegionReply),
252    DowngradeRegion(DowngradeRegionReply),
253    FlushRegion(SimpleReply),
254}
255
256impl Display for InstructionReply {
257    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
258        match self {
259            Self::OpenRegion(reply) => write!(f, "InstructionReply::OpenRegion({})", reply),
260            Self::CloseRegion(reply) => write!(f, "InstructionReply::CloseRegion({})", reply),
261            Self::UpgradeRegion(reply) => write!(f, "InstructionReply::UpgradeRegion({})", reply),
262            Self::DowngradeRegion(reply) => {
263                write!(f, "InstructionReply::DowngradeRegion({})", reply)
264            }
265            Self::FlushRegion(reply) => write!(f, "InstructionReply::FlushRegion({})", reply),
266        }
267    }
268}
269
270#[cfg(test)]
271mod tests {
272    use super::*;
273
274    #[test]
275    fn test_serialize_instruction() {
276        let open_region = Instruction::OpenRegion(OpenRegion::new(
277            RegionIdent {
278                datanode_id: 2,
279                table_id: 1024,
280                region_number: 1,
281                engine: "mito2".to_string(),
282            },
283            "test/foo",
284            HashMap::new(),
285            HashMap::new(),
286            false,
287        ));
288
289        let serialized = serde_json::to_string(&open_region).unwrap();
290
291        assert_eq!(
292            r#"{"OpenRegion":{"region_ident":{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"},"region_storage_path":"test/foo","region_options":{},"region_wal_options":{},"skip_wal_replay":false}}"#,
293            serialized
294        );
295
296        let close_region = Instruction::CloseRegion(RegionIdent {
297            datanode_id: 2,
298            table_id: 1024,
299            region_number: 1,
300            engine: "mito2".to_string(),
301        });
302
303        let serialized = serde_json::to_string(&close_region).unwrap();
304
305        assert_eq!(
306            r#"{"CloseRegion":{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"}}"#,
307            serialized
308        );
309    }
310
311    #[derive(Debug, Clone, Serialize, Deserialize)]
312    struct LegacyOpenRegion {
313        region_ident: RegionIdent,
314        region_storage_path: String,
315        region_options: HashMap<String, String>,
316    }
317
318    #[test]
319    fn test_compatible_serialize_open_region() {
320        let region_ident = RegionIdent {
321            datanode_id: 2,
322            table_id: 1024,
323            region_number: 1,
324            engine: "mito2".to_string(),
325        };
326        let region_storage_path = "test/foo".to_string();
327        let region_options = HashMap::from([
328            ("a".to_string(), "aa".to_string()),
329            ("b".to_string(), "bb".to_string()),
330        ]);
331
332        // Serialize a legacy OpenRegion.
333        let legacy_open_region = LegacyOpenRegion {
334            region_ident: region_ident.clone(),
335            region_storage_path: region_storage_path.clone(),
336            region_options: region_options.clone(),
337        };
338        let serialized = serde_json::to_string(&legacy_open_region).unwrap();
339
340        // Deserialize to OpenRegion.
341        let deserialized = serde_json::from_str(&serialized).unwrap();
342        let expected = OpenRegion {
343            region_ident,
344            region_storage_path,
345            region_options,
346            region_wal_options: HashMap::new(),
347            skip_wal_replay: false,
348        };
349        assert_eq!(expected, deserialized);
350    }
351}