common_meta/
instruction.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::fmt::{Display, Formatter};
17use std::time::Duration;
18
19use serde::{Deserialize, Serialize};
20use store_api::storage::{RegionId, RegionNumber};
21use strum::Display;
22use table::metadata::TableId;
23use table::table_name::TableName;
24
25use crate::flow_name::FlowName;
26use crate::key::schema_name::SchemaName;
27use crate::key::{FlowId, FlowPartitionId};
28use crate::peer::Peer;
29use crate::{DatanodeId, FlownodeId};
30
31#[derive(Eq, Hash, PartialEq, Clone, Debug, Serialize, Deserialize)]
32pub struct RegionIdent {
33    pub datanode_id: DatanodeId,
34    pub table_id: TableId,
35    pub region_number: RegionNumber,
36    pub engine: String,
37}
38
39impl RegionIdent {
40    pub fn get_region_id(&self) -> RegionId {
41        RegionId::new(self.table_id, self.region_number)
42    }
43}
44
45impl Display for RegionIdent {
46    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
47        write!(
48            f,
49            "RegionIdent(datanode_id='{}', table_id={}, region_number={}, engine = {})",
50            self.datanode_id, self.table_id, self.region_number, self.engine
51        )
52    }
53}
54
55/// The result of downgrade leader region.
56#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
57pub struct DowngradeRegionReply {
58    /// Returns the `last_entry_id` if available.
59    pub last_entry_id: Option<u64>,
60    /// Returns the `metadata_last_entry_id` if available (Only available for metric engine).
61    pub metadata_last_entry_id: Option<u64>,
62    /// Indicates whether the region exists.
63    pub exists: bool,
64    /// Return error if any during the operation.
65    pub error: Option<String>,
66}
67
68impl Display for DowngradeRegionReply {
69    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
70        write!(
71            f,
72            "(last_entry_id={:?}, exists={}, error={:?})",
73            self.last_entry_id, self.exists, self.error
74        )
75    }
76}
77
78#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
79pub struct SimpleReply {
80    pub result: bool,
81    pub error: Option<String>,
82}
83
84impl Display for SimpleReply {
85    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
86        write!(f, "(result={}, error={:?})", self.result, self.error)
87    }
88}
89
90impl Display for OpenRegion {
91    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
92        write!(
93            f,
94            "OpenRegion(region_ident={}, region_storage_path={})",
95            self.region_ident, self.region_storage_path
96        )
97    }
98}
99
100#[serde_with::serde_as]
101#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
102pub struct OpenRegion {
103    pub region_ident: RegionIdent,
104    pub region_storage_path: String,
105    pub region_options: HashMap<String, String>,
106    #[serde(default)]
107    #[serde_as(as = "HashMap<serde_with::DisplayFromStr, _>")]
108    pub region_wal_options: HashMap<RegionNumber, String>,
109    #[serde(default)]
110    pub skip_wal_replay: bool,
111}
112
113impl OpenRegion {
114    pub fn new(
115        region_ident: RegionIdent,
116        path: &str,
117        region_options: HashMap<String, String>,
118        region_wal_options: HashMap<RegionNumber, String>,
119        skip_wal_replay: bool,
120    ) -> Self {
121        Self {
122            region_ident,
123            region_storage_path: path.to_string(),
124            region_options,
125            region_wal_options,
126            skip_wal_replay,
127        }
128    }
129}
130
131/// The instruction of downgrading leader region.
132#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
133pub struct DowngradeRegion {
134    /// The [RegionId].
135    pub region_id: RegionId,
136    /// The timeout of waiting for flush the region.
137    ///
138    /// `None` stands for don't flush before downgrading the region.
139    #[serde(default)]
140    pub flush_timeout: Option<Duration>,
141}
142
143impl Display for DowngradeRegion {
144    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
145        write!(
146            f,
147            "DowngradeRegion(region_id={}, flush_timeout={:?})",
148            self.region_id, self.flush_timeout,
149        )
150    }
151}
152
153/// Upgrades a follower region to leader region.
154#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
155pub struct UpgradeRegion {
156    /// The [RegionId].
157    pub region_id: RegionId,
158    /// The `last_entry_id` of old leader region.
159    pub last_entry_id: Option<u64>,
160    /// The `last_entry_id` of old leader metadata region (Only used for metric engine).
161    pub metadata_last_entry_id: Option<u64>,
162    /// The timeout of waiting for a wal replay.
163    ///
164    /// `None` stands for no wait,
165    /// it's helpful to verify whether the leader region is ready.
166    #[serde(with = "humantime_serde")]
167    pub replay_timeout: Option<Duration>,
168    /// The hint for replaying memtable.
169    #[serde(default)]
170    pub location_id: Option<u64>,
171}
172
173#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
174/// The identifier of cache.
175pub enum CacheIdent {
176    FlowId(FlowId),
177    FlowName(FlowName),
178    TableId(TableId),
179    TableName(TableName),
180    SchemaName(SchemaName),
181    CreateFlow(CreateFlow),
182    DropFlow(DropFlow),
183}
184
185#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
186pub struct CreateFlow {
187    /// The unique identifier for the flow.
188    pub flow_id: FlowId,
189    pub source_table_ids: Vec<TableId>,
190    /// Mapping of flow partition to peer information
191    pub partition_to_peer_mapping: Vec<(FlowPartitionId, Peer)>,
192}
193
194#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
195pub struct DropFlow {
196    pub flow_id: FlowId,
197    pub source_table_ids: Vec<TableId>,
198    /// Mapping of flow partition to flownode id
199    pub flow_part2node_id: Vec<(FlowPartitionId, FlownodeId)>,
200}
201
202/// Flushes a batch of regions.
203#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
204pub struct FlushRegions {
205    pub region_ids: Vec<RegionId>,
206}
207
208#[derive(Debug, Clone, Serialize, Deserialize, Display, PartialEq)]
209pub enum Instruction {
210    /// Opens a region.
211    ///
212    /// - Returns true if a specified region exists.
213    OpenRegion(OpenRegion),
214    /// Closes a region.
215    ///
216    /// - Returns true if a specified region does not exist.
217    CloseRegion(RegionIdent),
218    /// Upgrades a region.
219    UpgradeRegion(UpgradeRegion),
220    /// Downgrades a region.
221    DowngradeRegion(DowngradeRegion),
222    /// Invalidates batch cache.
223    InvalidateCaches(Vec<CacheIdent>),
224    /// Flushes regions.
225    FlushRegions(FlushRegions),
226    /// Flushes a single region.
227    FlushRegion(RegionId),
228}
229
230/// The reply of [UpgradeRegion].
231#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
232pub struct UpgradeRegionReply {
233    /// Returns true if `last_entry_id` has been replayed to the latest.
234    pub ready: bool,
235    /// Indicates whether the region exists.
236    pub exists: bool,
237    /// Returns error if any.
238    pub error: Option<String>,
239}
240
241impl Display for UpgradeRegionReply {
242    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
243        write!(
244            f,
245            "(ready={}, exists={}, error={:?})",
246            self.ready, self.exists, self.error
247        )
248    }
249}
250
251#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
252#[serde(tag = "type", rename_all = "snake_case")]
253pub enum InstructionReply {
254    OpenRegion(SimpleReply),
255    CloseRegion(SimpleReply),
256    UpgradeRegion(UpgradeRegionReply),
257    DowngradeRegion(DowngradeRegionReply),
258    FlushRegion(SimpleReply),
259}
260
261impl Display for InstructionReply {
262    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
263        match self {
264            Self::OpenRegion(reply) => write!(f, "InstructionReply::OpenRegion({})", reply),
265            Self::CloseRegion(reply) => write!(f, "InstructionReply::CloseRegion({})", reply),
266            Self::UpgradeRegion(reply) => write!(f, "InstructionReply::UpgradeRegion({})", reply),
267            Self::DowngradeRegion(reply) => {
268                write!(f, "InstructionReply::DowngradeRegion({})", reply)
269            }
270            Self::FlushRegion(reply) => write!(f, "InstructionReply::FlushRegion({})", reply),
271        }
272    }
273}
274
275#[cfg(test)]
276mod tests {
277    use super::*;
278
279    #[test]
280    fn test_serialize_instruction() {
281        let open_region = Instruction::OpenRegion(OpenRegion::new(
282            RegionIdent {
283                datanode_id: 2,
284                table_id: 1024,
285                region_number: 1,
286                engine: "mito2".to_string(),
287            },
288            "test/foo",
289            HashMap::new(),
290            HashMap::new(),
291            false,
292        ));
293
294        let serialized = serde_json::to_string(&open_region).unwrap();
295
296        assert_eq!(
297            r#"{"OpenRegion":{"region_ident":{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"},"region_storage_path":"test/foo","region_options":{},"region_wal_options":{},"skip_wal_replay":false}}"#,
298            serialized
299        );
300
301        let close_region = Instruction::CloseRegion(RegionIdent {
302            datanode_id: 2,
303            table_id: 1024,
304            region_number: 1,
305            engine: "mito2".to_string(),
306        });
307
308        let serialized = serde_json::to_string(&close_region).unwrap();
309
310        assert_eq!(
311            r#"{"CloseRegion":{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"}}"#,
312            serialized
313        );
314    }
315
316    #[derive(Debug, Clone, Serialize, Deserialize)]
317    struct LegacyOpenRegion {
318        region_ident: RegionIdent,
319        region_storage_path: String,
320        region_options: HashMap<String, String>,
321    }
322
323    #[test]
324    fn test_compatible_serialize_open_region() {
325        let region_ident = RegionIdent {
326            datanode_id: 2,
327            table_id: 1024,
328            region_number: 1,
329            engine: "mito2".to_string(),
330        };
331        let region_storage_path = "test/foo".to_string();
332        let region_options = HashMap::from([
333            ("a".to_string(), "aa".to_string()),
334            ("b".to_string(), "bb".to_string()),
335        ]);
336
337        // Serialize a legacy OpenRegion.
338        let legacy_open_region = LegacyOpenRegion {
339            region_ident: region_ident.clone(),
340            region_storage_path: region_storage_path.clone(),
341            region_options: region_options.clone(),
342        };
343        let serialized = serde_json::to_string(&legacy_open_region).unwrap();
344
345        // Deserialize to OpenRegion.
346        let deserialized = serde_json::from_str(&serialized).unwrap();
347        let expected = OpenRegion {
348            region_ident,
349            region_storage_path,
350            region_options,
351            region_wal_options: HashMap::new(),
352            skip_wal_replay: false,
353        };
354        assert_eq!(expected, deserialized);
355    }
356}