common_meta/
lock_key.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::Display;
16
17use common_catalog::{format_full_table_name, format_schema_name};
18use common_procedure::StringKey;
19use store_api::storage::{RegionId, TableId};
20
21use crate::key::FlowId;
22
23const CATALOG_LOCK_PREFIX: &str = "__catalog_lock";
24const SCHEMA_LOCK_PREFIX: &str = "__schema_lock";
25const TABLE_LOCK_PREFIX: &str = "__table_lock";
26const TABLE_NAME_LOCK_PREFIX: &str = "__table_name_lock";
27const FLOW_NAME_LOCK_PREFIX: &str = "__flow_name_lock";
28const REGION_LOCK_PREFIX: &str = "__region_lock";
29const FLOW_LOCK_PREFIX: &str = "__flow_lock";
30const REMOTE_WAL_LOCK_PREFIX: &str = "__remote_wal_lock";
31
32/// [CatalogLock] acquires the lock on the tenant level.
33pub enum CatalogLock<'a> {
34    Read(&'a str),
35    Write(&'a str),
36}
37
38impl Display for CatalogLock<'_> {
39    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
40        let key = match self {
41            CatalogLock::Read(s) => s,
42            CatalogLock::Write(s) => s,
43        };
44        write!(f, "{}/{}", CATALOG_LOCK_PREFIX, key)
45    }
46}
47
48impl From<CatalogLock<'_>> for StringKey {
49    fn from(value: CatalogLock) -> Self {
50        match value {
51            CatalogLock::Write(_) => StringKey::Exclusive(value.to_string()),
52            CatalogLock::Read(_) => StringKey::Share(value.to_string()),
53        }
54    }
55}
56
57/// [SchemaLock] acquires the lock on the database level.
58pub enum SchemaLock {
59    Read(String),
60    Write(String),
61}
62
63impl SchemaLock {
64    pub fn read(catalog: &str, schema: &str) -> Self {
65        Self::Read(format_schema_name(catalog, schema))
66    }
67
68    pub fn write(catalog: &str, schema: &str) -> Self {
69        Self::Write(format_schema_name(catalog, schema))
70    }
71}
72
73impl Display for SchemaLock {
74    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
75        let key = match self {
76            SchemaLock::Read(s) => s,
77            SchemaLock::Write(s) => s,
78        };
79        write!(f, "{}/{}", SCHEMA_LOCK_PREFIX, key)
80    }
81}
82
83impl From<SchemaLock> for StringKey {
84    fn from(value: SchemaLock) -> Self {
85        match value {
86            SchemaLock::Write(_) => StringKey::Exclusive(value.to_string()),
87            SchemaLock::Read(_) => StringKey::Share(value.to_string()),
88        }
89    }
90}
91
92/// [TableNameLock] prevents any procedures trying to create a table named it.
93pub enum TableNameLock {
94    Write(String),
95}
96
97impl TableNameLock {
98    pub fn new(catalog: &str, schema: &str, table: &str) -> Self {
99        Self::Write(format_full_table_name(catalog, schema, table))
100    }
101}
102
103impl Display for TableNameLock {
104    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
105        let TableNameLock::Write(name) = self;
106        write!(f, "{}/{}", TABLE_NAME_LOCK_PREFIX, name)
107    }
108}
109
110impl From<TableNameLock> for StringKey {
111    fn from(value: TableNameLock) -> Self {
112        match value {
113            TableNameLock::Write(_) => StringKey::Exclusive(value.to_string()),
114        }
115    }
116}
117
118/// [FlowNameLock] prevents any procedures trying to create a flow named it.
119pub enum FlowNameLock {
120    Write(String),
121}
122
123impl FlowNameLock {
124    pub fn new(catalog: &str, flow_name: &str) -> Self {
125        Self::Write(format!("{catalog}.{flow_name}"))
126    }
127}
128
129impl Display for FlowNameLock {
130    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
131        let FlowNameLock::Write(name) = self;
132        write!(f, "{}/{}", FLOW_NAME_LOCK_PREFIX, name)
133    }
134}
135
136impl From<FlowNameLock> for StringKey {
137    fn from(value: FlowNameLock) -> Self {
138        match value {
139            FlowNameLock::Write(_) => StringKey::Exclusive(value.to_string()),
140        }
141    }
142}
143
144/// [TableLock] acquires the lock on the table level.
145///
146/// Note: Allows to read/modify the corresponding table's [TableInfoValue](crate::key::table_info::TableInfoValue),
147/// [TableRouteValue](crate::key::table_route::TableRouteValue), [TableDatanodeValue](crate::key::datanode_table::DatanodeTableValue).
148pub enum TableLock {
149    Read(TableId),
150    Write(TableId),
151}
152
153impl Display for TableLock {
154    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
155        let key = match self {
156            TableLock::Read(s) => s,
157            TableLock::Write(s) => s,
158        };
159        write!(f, "{}/{}", TABLE_LOCK_PREFIX, key)
160    }
161}
162
163impl From<TableLock> for StringKey {
164    fn from(value: TableLock) -> Self {
165        match value {
166            TableLock::Write(_) => StringKey::Exclusive(value.to_string()),
167            TableLock::Read(_) => StringKey::Share(value.to_string()),
168        }
169    }
170}
171
172/// [RegionLock] acquires the lock on the region level.
173///
174/// Note:
175/// - Allows modification the corresponding region's [TableRouteValue](crate::key::table_route::TableRouteValue),
176///   [TableDatanodeValue](crate::key::datanode_table::DatanodeTableValue) even if
177///   it acquires the [RegionLock::Write] only without acquiring the [TableLock::Write].
178///
179/// - Should acquire [TableLock] of the table at same procedure.
180///
181/// TODO(weny): we should consider separating TableRouteValue into finer keys.
182pub enum RegionLock {
183    Read(RegionId),
184    Write(RegionId),
185}
186
187impl Display for RegionLock {
188    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
189        let key = match self {
190            RegionLock::Read(s) => s.as_u64(),
191            RegionLock::Write(s) => s.as_u64(),
192        };
193        write!(f, "{}/{}", REGION_LOCK_PREFIX, key)
194    }
195}
196
197impl From<RegionLock> for StringKey {
198    fn from(value: RegionLock) -> Self {
199        match value {
200            RegionLock::Write(_) => StringKey::Exclusive(value.to_string()),
201            RegionLock::Read(_) => StringKey::Share(value.to_string()),
202        }
203    }
204}
205
206/// [FlowLock] acquires the lock on the table level.
207///
208/// Note: Allows to read/modify the corresponding flow's [FlowInfoValue](crate::key::flow::flow_info::FlowInfoValue),
209/// [FlowNameValue](crate::key::flow::flow_name::FlowNameValue),[FlownodeFlowKey](crate::key::flow::flownode_flow::FlownodeFlowKey),
210/// [TableFlowKey](crate::key::flow::table_flow::TableFlowKey).
211pub enum FlowLock {
212    Read(FlowId),
213    Write(FlowId),
214}
215
216impl Display for FlowLock {
217    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
218        let key = match self {
219            FlowLock::Read(s) => s,
220            FlowLock::Write(s) => s,
221        };
222        write!(f, "{}/{}", FLOW_LOCK_PREFIX, key)
223    }
224}
225
226impl From<FlowLock> for StringKey {
227    fn from(value: FlowLock) -> Self {
228        match value {
229            FlowLock::Write(_) => StringKey::Exclusive(value.to_string()),
230            FlowLock::Read(_) => StringKey::Share(value.to_string()),
231        }
232    }
233}
234
235/// [RemoteWalLock] acquires the lock on the remote wal topic level.
236pub enum RemoteWalLock {
237    Read(String),
238    Write(String),
239}
240
241impl Display for RemoteWalLock {
242    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
243        let key = match self {
244            RemoteWalLock::Read(s) => s,
245            RemoteWalLock::Write(s) => s,
246        };
247        write!(f, "{}/{}", REMOTE_WAL_LOCK_PREFIX, key)
248    }
249}
250
251impl From<RemoteWalLock> for StringKey {
252    fn from(value: RemoteWalLock) -> Self {
253        match value {
254            RemoteWalLock::Write(_) => StringKey::Exclusive(value.to_string()),
255            RemoteWalLock::Read(_) => StringKey::Share(value.to_string()),
256        }
257    }
258}
259
260#[cfg(test)]
261mod tests {
262    use common_procedure::StringKey;
263
264    use crate::lock_key::*;
265
266    #[test]
267    fn test_lock_key() {
268        // The catalog lock
269        let string_key: StringKey = CatalogLock::Read("foo").into();
270        assert_eq!(
271            string_key,
272            StringKey::Share(format!("{}/{}", CATALOG_LOCK_PREFIX, "foo"))
273        );
274        let string_key: StringKey = CatalogLock::Write("foo").into();
275        assert_eq!(
276            string_key,
277            StringKey::Exclusive(format!("{}/{}", CATALOG_LOCK_PREFIX, "foo"))
278        );
279        // The schema lock
280        let string_key: StringKey = SchemaLock::read("foo", "bar").into();
281        assert_eq!(
282            string_key,
283            StringKey::Share(format!("{}/{}", SCHEMA_LOCK_PREFIX, "foo.bar"))
284        );
285        let string_key: StringKey = SchemaLock::write("foo", "bar").into();
286        assert_eq!(
287            string_key,
288            StringKey::Exclusive(format!("{}/{}", SCHEMA_LOCK_PREFIX, "foo.bar"))
289        );
290        // The table lock
291        let string_key: StringKey = TableLock::Read(1024).into();
292        assert_eq!(
293            string_key,
294            StringKey::Share(format!("{}/{}", TABLE_LOCK_PREFIX, 1024))
295        );
296        let string_key: StringKey = TableLock::Write(1024).into();
297        assert_eq!(
298            string_key,
299            StringKey::Exclusive(format!("{}/{}", TABLE_LOCK_PREFIX, 1024))
300        );
301        // The table name lock
302        let string_key: StringKey = TableNameLock::new("foo", "bar", "baz").into();
303        assert_eq!(
304            string_key,
305            StringKey::Exclusive(format!("{}/{}", TABLE_NAME_LOCK_PREFIX, "foo.bar.baz"))
306        );
307        // The flow name lock
308        let string_key: StringKey = FlowNameLock::new("foo", "baz").into();
309        assert_eq!(
310            string_key,
311            StringKey::Exclusive(format!("{}/{}", FLOW_NAME_LOCK_PREFIX, "foo.baz"))
312        );
313        // The region lock
314        let region_id = RegionId::new(1024, 1);
315        let string_key: StringKey = RegionLock::Read(region_id).into();
316        assert_eq!(
317            string_key,
318            StringKey::Share(format!("{}/{}", REGION_LOCK_PREFIX, region_id.as_u64()))
319        );
320        let string_key: StringKey = RegionLock::Write(region_id).into();
321        assert_eq!(
322            string_key,
323            StringKey::Exclusive(format!("{}/{}", REGION_LOCK_PREFIX, region_id.as_u64()))
324        );
325        // The flow lock
326        let flow_id = 1024;
327        let string_key: StringKey = FlowLock::Read(flow_id).into();
328        assert_eq!(
329            string_key,
330            StringKey::Share(format!("{}/{}", FLOW_LOCK_PREFIX, flow_id))
331        );
332        let string_key: StringKey = FlowLock::Write(flow_id).into();
333        assert_eq!(
334            string_key,
335            StringKey::Exclusive(format!("{}/{}", FLOW_LOCK_PREFIX, flow_id))
336        );
337        // The remote wal lock
338        let string_key: StringKey = RemoteWalLock::Read("foo".to_string()).into();
339        assert_eq!(
340            string_key,
341            StringKey::Share(format!("{}/{}", REMOTE_WAL_LOCK_PREFIX, "foo"))
342        );
343        let string_key: StringKey = RemoteWalLock::Write("foo".to_string()).into();
344        assert_eq!(
345            string_key,
346            StringKey::Exclusive(format!("{}/{}", REMOTE_WAL_LOCK_PREFIX, "foo"))
347        );
348    }
349}