common_meta/
instruction.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::fmt::{Display, Formatter};
17use std::time::Duration;
18
19use serde::{Deserialize, Serialize};
20use store_api::storage::{RegionId, RegionNumber};
21use strum::Display;
22use table::metadata::TableId;
23use table::table_name::TableName;
24
25use crate::flow_name::FlowName;
26use crate::key::schema_name::SchemaName;
27use crate::key::{FlowId, FlowPartitionId};
28use crate::peer::Peer;
29use crate::{DatanodeId, FlownodeId};
30
31#[derive(Eq, Hash, PartialEq, Clone, Debug, Serialize, Deserialize)]
32pub struct RegionIdent {
33    pub datanode_id: DatanodeId,
34    pub table_id: TableId,
35    pub region_number: RegionNumber,
36    pub engine: String,
37}
38
39impl RegionIdent {
40    pub fn get_region_id(&self) -> RegionId {
41        RegionId::new(self.table_id, self.region_number)
42    }
43}
44
45impl Display for RegionIdent {
46    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
47        write!(
48            f,
49            "RegionIdent(datanode_id='{}', table_id={}, region_number={}, engine = {})",
50            self.datanode_id, self.table_id, self.region_number, self.engine
51        )
52    }
53}
54
55/// The result of downgrade leader region.
56#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
57pub struct DowngradeRegionReply {
58    /// Returns the `last_entry_id` if available.
59    pub last_entry_id: Option<u64>,
60    /// Returns the `metadata_last_entry_id` if available (Only available for metric engine).
61    pub metadata_last_entry_id: Option<u64>,
62    /// Indicates whether the region exists.
63    pub exists: bool,
64    /// Return error if any during the operation.
65    pub error: Option<String>,
66}
67
68impl Display for DowngradeRegionReply {
69    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
70        write!(
71            f,
72            "(last_entry_id={:?}, exists={}, error={:?})",
73            self.last_entry_id, self.exists, self.error
74        )
75    }
76}
77
78#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
79pub struct SimpleReply {
80    pub result: bool,
81    pub error: Option<String>,
82}
83
84impl Display for SimpleReply {
85    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
86        write!(f, "(result={}, error={:?})", self.result, self.error)
87    }
88}
89
90impl Display for OpenRegion {
91    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
92        write!(
93            f,
94            "OpenRegion(region_ident={}, region_storage_path={})",
95            self.region_ident, self.region_storage_path
96        )
97    }
98}
99
100#[serde_with::serde_as]
101#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
102pub struct OpenRegion {
103    pub region_ident: RegionIdent,
104    pub region_storage_path: String,
105    pub region_options: HashMap<String, String>,
106    #[serde(default)]
107    #[serde_as(as = "HashMap<serde_with::DisplayFromStr, _>")]
108    pub region_wal_options: HashMap<RegionNumber, String>,
109    #[serde(default)]
110    pub skip_wal_replay: bool,
111}
112
113impl OpenRegion {
114    pub fn new(
115        region_ident: RegionIdent,
116        path: &str,
117        region_options: HashMap<String, String>,
118        region_wal_options: HashMap<RegionNumber, String>,
119        skip_wal_replay: bool,
120    ) -> Self {
121        Self {
122            region_ident,
123            region_storage_path: path.to_string(),
124            region_options,
125            region_wal_options,
126            skip_wal_replay,
127        }
128    }
129}
130
131/// The instruction of downgrading leader region.
132#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
133pub struct DowngradeRegion {
134    /// The [RegionId].
135    pub region_id: RegionId,
136    /// The timeout of waiting for flush the region.
137    ///
138    /// `None` stands for don't flush before downgrading the region.
139    #[serde(default)]
140    pub flush_timeout: Option<Duration>,
141}
142
143impl Display for DowngradeRegion {
144    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
145        write!(
146            f,
147            "DowngradeRegion(region_id={}, flush_timeout={:?})",
148            self.region_id, self.flush_timeout,
149        )
150    }
151}
152
153/// Upgrades a follower region to leader region.
154#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
155pub struct UpgradeRegion {
156    /// The [RegionId].
157    pub region_id: RegionId,
158    /// The `last_entry_id` of old leader region.
159    pub last_entry_id: Option<u64>,
160    /// The `last_entry_id` of old leader metadata region (Only used for metric engine).
161    pub metadata_last_entry_id: Option<u64>,
162    /// The timeout of waiting for a wal replay.
163    ///
164    /// `None` stands for no wait,
165    /// it's helpful to verify whether the leader region is ready.
166    #[serde(with = "humantime_serde")]
167    pub replay_timeout: Option<Duration>,
168    /// The hint for replaying memtable.
169    #[serde(default)]
170    pub location_id: Option<u64>,
171}
172
173#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
174/// The identifier of cache.
175pub enum CacheIdent {
176    FlowId(FlowId),
177    /// Indicate change of address of flownode.
178    FlowNodeAddressChange(u64),
179    FlowName(FlowName),
180    TableId(TableId),
181    TableName(TableName),
182    SchemaName(SchemaName),
183    CreateFlow(CreateFlow),
184    DropFlow(DropFlow),
185}
186
187#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
188pub struct CreateFlow {
189    /// The unique identifier for the flow.
190    pub flow_id: FlowId,
191    pub source_table_ids: Vec<TableId>,
192    /// Mapping of flow partition to peer information
193    pub partition_to_peer_mapping: Vec<(FlowPartitionId, Peer)>,
194}
195
196#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
197pub struct DropFlow {
198    pub flow_id: FlowId,
199    pub source_table_ids: Vec<TableId>,
200    /// Mapping of flow partition to flownode id
201    pub flow_part2node_id: Vec<(FlowPartitionId, FlownodeId)>,
202}
203
204/// Flushes a batch of regions.
205#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
206pub struct FlushRegions {
207    pub region_ids: Vec<RegionId>,
208}
209
210#[derive(Debug, Clone, Serialize, Deserialize, Display, PartialEq)]
211pub enum Instruction {
212    /// Opens a region.
213    ///
214    /// - Returns true if a specified region exists.
215    OpenRegion(OpenRegion),
216    /// Closes a region.
217    ///
218    /// - Returns true if a specified region does not exist.
219    CloseRegion(RegionIdent),
220    /// Upgrades a region.
221    UpgradeRegion(UpgradeRegion),
222    /// Downgrades a region.
223    DowngradeRegion(DowngradeRegion),
224    /// Invalidates batch cache.
225    InvalidateCaches(Vec<CacheIdent>),
226    /// Flushes regions.
227    FlushRegions(FlushRegions),
228    /// Flushes a single region.
229    FlushRegion(RegionId),
230}
231
232/// The reply of [UpgradeRegion].
233#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
234pub struct UpgradeRegionReply {
235    /// Returns true if `last_entry_id` has been replayed to the latest.
236    pub ready: bool,
237    /// Indicates whether the region exists.
238    pub exists: bool,
239    /// Returns error if any.
240    pub error: Option<String>,
241}
242
243impl Display for UpgradeRegionReply {
244    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
245        write!(
246            f,
247            "(ready={}, exists={}, error={:?})",
248            self.ready, self.exists, self.error
249        )
250    }
251}
252
253#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
254#[serde(tag = "type", rename_all = "snake_case")]
255pub enum InstructionReply {
256    OpenRegion(SimpleReply),
257    CloseRegion(SimpleReply),
258    UpgradeRegion(UpgradeRegionReply),
259    DowngradeRegion(DowngradeRegionReply),
260    FlushRegion(SimpleReply),
261}
262
263impl Display for InstructionReply {
264    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
265        match self {
266            Self::OpenRegion(reply) => write!(f, "InstructionReply::OpenRegion({})", reply),
267            Self::CloseRegion(reply) => write!(f, "InstructionReply::CloseRegion({})", reply),
268            Self::UpgradeRegion(reply) => write!(f, "InstructionReply::UpgradeRegion({})", reply),
269            Self::DowngradeRegion(reply) => {
270                write!(f, "InstructionReply::DowngradeRegion({})", reply)
271            }
272            Self::FlushRegion(reply) => write!(f, "InstructionReply::FlushRegion({})", reply),
273        }
274    }
275}
276
277#[cfg(test)]
278mod tests {
279    use super::*;
280
281    #[test]
282    fn test_serialize_instruction() {
283        let open_region = Instruction::OpenRegion(OpenRegion::new(
284            RegionIdent {
285                datanode_id: 2,
286                table_id: 1024,
287                region_number: 1,
288                engine: "mito2".to_string(),
289            },
290            "test/foo",
291            HashMap::new(),
292            HashMap::new(),
293            false,
294        ));
295
296        let serialized = serde_json::to_string(&open_region).unwrap();
297
298        assert_eq!(
299            r#"{"OpenRegion":{"region_ident":{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"},"region_storage_path":"test/foo","region_options":{},"region_wal_options":{},"skip_wal_replay":false}}"#,
300            serialized
301        );
302
303        let close_region = Instruction::CloseRegion(RegionIdent {
304            datanode_id: 2,
305            table_id: 1024,
306            region_number: 1,
307            engine: "mito2".to_string(),
308        });
309
310        let serialized = serde_json::to_string(&close_region).unwrap();
311
312        assert_eq!(
313            r#"{"CloseRegion":{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"}}"#,
314            serialized
315        );
316    }
317
318    #[derive(Debug, Clone, Serialize, Deserialize)]
319    struct LegacyOpenRegion {
320        region_ident: RegionIdent,
321        region_storage_path: String,
322        region_options: HashMap<String, String>,
323    }
324
325    #[test]
326    fn test_compatible_serialize_open_region() {
327        let region_ident = RegionIdent {
328            datanode_id: 2,
329            table_id: 1024,
330            region_number: 1,
331            engine: "mito2".to_string(),
332        };
333        let region_storage_path = "test/foo".to_string();
334        let region_options = HashMap::from([
335            ("a".to_string(), "aa".to_string()),
336            ("b".to_string(), "bb".to_string()),
337        ]);
338
339        // Serialize a legacy OpenRegion.
340        let legacy_open_region = LegacyOpenRegion {
341            region_ident: region_ident.clone(),
342            region_storage_path: region_storage_path.clone(),
343            region_options: region_options.clone(),
344        };
345        let serialized = serde_json::to_string(&legacy_open_region).unwrap();
346
347        // Deserialize to OpenRegion.
348        let deserialized = serde_json::from_str(&serialized).unwrap();
349        let expected = OpenRegion {
350            region_ident,
351            region_storage_path,
352            region_options,
353            region_wal_options: HashMap::new(),
354            skip_wal_replay: false,
355        };
356        assert_eq!(expected, deserialized);
357    }
358}