1use std::fmt::Display;
16
17use common_catalog::{format_full_table_name, format_schema_name};
18use common_procedure::StringKey;
19use store_api::storage::{RegionId, TableId};
20
21use crate::key::FlowId;
22
23const CATALOG_LOCK_PREFIX: &str = "__catalog_lock";
24const SCHEMA_LOCK_PREFIX: &str = "__schema_lock";
25const TABLE_LOCK_PREFIX: &str = "__table_lock";
26const TABLE_NAME_LOCK_PREFIX: &str = "__table_name_lock";
27const FLOW_NAME_LOCK_PREFIX: &str = "__flow_name_lock";
28const REGION_LOCK_PREFIX: &str = "__region_lock";
29const FLOW_LOCK_PREFIX: &str = "__flow_lock";
30const REMOTE_WAL_LOCK_PREFIX: &str = "__remote_wal_lock";
31
32pub enum CatalogLock<'a> {
34 Read(&'a str),
35 Write(&'a str),
36}
37
38impl Display for CatalogLock<'_> {
39 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
40 let key = match self {
41 CatalogLock::Read(s) => s,
42 CatalogLock::Write(s) => s,
43 };
44 write!(f, "{}/{}", CATALOG_LOCK_PREFIX, key)
45 }
46}
47
48impl From<CatalogLock<'_>> for StringKey {
49 fn from(value: CatalogLock) -> Self {
50 match value {
51 CatalogLock::Write(_) => StringKey::Exclusive(value.to_string()),
52 CatalogLock::Read(_) => StringKey::Share(value.to_string()),
53 }
54 }
55}
56
57pub enum SchemaLock {
59 Read(String),
60 Write(String),
61}
62
63impl SchemaLock {
64 pub fn read(catalog: &str, schema: &str) -> Self {
65 Self::Read(format_schema_name(catalog, schema))
66 }
67
68 pub fn write(catalog: &str, schema: &str) -> Self {
69 Self::Write(format_schema_name(catalog, schema))
70 }
71}
72
73impl Display for SchemaLock {
74 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
75 let key = match self {
76 SchemaLock::Read(s) => s,
77 SchemaLock::Write(s) => s,
78 };
79 write!(f, "{}/{}", SCHEMA_LOCK_PREFIX, key)
80 }
81}
82
83impl From<SchemaLock> for StringKey {
84 fn from(value: SchemaLock) -> Self {
85 match value {
86 SchemaLock::Write(_) => StringKey::Exclusive(value.to_string()),
87 SchemaLock::Read(_) => StringKey::Share(value.to_string()),
88 }
89 }
90}
91
92pub enum TableNameLock {
94 Write(String),
95}
96
97impl TableNameLock {
98 pub fn new(catalog: &str, schema: &str, table: &str) -> Self {
99 Self::Write(format_full_table_name(catalog, schema, table))
100 }
101}
102
103impl Display for TableNameLock {
104 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
105 let TableNameLock::Write(name) = self;
106 write!(f, "{}/{}", TABLE_NAME_LOCK_PREFIX, name)
107 }
108}
109
110impl From<TableNameLock> for StringKey {
111 fn from(value: TableNameLock) -> Self {
112 match value {
113 TableNameLock::Write(_) => StringKey::Exclusive(value.to_string()),
114 }
115 }
116}
117
118pub enum FlowNameLock {
120 Write(String),
121}
122
123impl FlowNameLock {
124 pub fn new(catalog: &str, flow_name: &str) -> Self {
125 Self::Write(format!("{catalog}.{flow_name}"))
126 }
127}
128
129impl Display for FlowNameLock {
130 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
131 let FlowNameLock::Write(name) = self;
132 write!(f, "{}/{}", FLOW_NAME_LOCK_PREFIX, name)
133 }
134}
135
136impl From<FlowNameLock> for StringKey {
137 fn from(value: FlowNameLock) -> Self {
138 match value {
139 FlowNameLock::Write(_) => StringKey::Exclusive(value.to_string()),
140 }
141 }
142}
143
144pub enum TableLock {
149 Read(TableId),
150 Write(TableId),
151}
152
153impl Display for TableLock {
154 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
155 let key = match self {
156 TableLock::Read(s) => s,
157 TableLock::Write(s) => s,
158 };
159 write!(f, "{}/{}", TABLE_LOCK_PREFIX, key)
160 }
161}
162
163impl From<TableLock> for StringKey {
164 fn from(value: TableLock) -> Self {
165 match value {
166 TableLock::Write(_) => StringKey::Exclusive(value.to_string()),
167 TableLock::Read(_) => StringKey::Share(value.to_string()),
168 }
169 }
170}
171
172pub enum RegionLock {
183 Read(RegionId),
184 Write(RegionId),
185}
186
187impl Display for RegionLock {
188 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
189 let key = match self {
190 RegionLock::Read(s) => s.as_u64(),
191 RegionLock::Write(s) => s.as_u64(),
192 };
193 write!(f, "{}/{}", REGION_LOCK_PREFIX, key)
194 }
195}
196
197impl From<RegionLock> for StringKey {
198 fn from(value: RegionLock) -> Self {
199 match value {
200 RegionLock::Write(_) => StringKey::Exclusive(value.to_string()),
201 RegionLock::Read(_) => StringKey::Share(value.to_string()),
202 }
203 }
204}
205
206pub enum FlowLock {
212 Read(FlowId),
213 Write(FlowId),
214}
215
216impl Display for FlowLock {
217 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
218 let key = match self {
219 FlowLock::Read(s) => s,
220 FlowLock::Write(s) => s,
221 };
222 write!(f, "{}/{}", FLOW_LOCK_PREFIX, key)
223 }
224}
225
226impl From<FlowLock> for StringKey {
227 fn from(value: FlowLock) -> Self {
228 match value {
229 FlowLock::Write(_) => StringKey::Exclusive(value.to_string()),
230 FlowLock::Read(_) => StringKey::Share(value.to_string()),
231 }
232 }
233}
234
235pub enum RemoteWalLock {
237 Read(String),
238 Write(String),
239}
240
241impl Display for RemoteWalLock {
242 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
243 let key = match self {
244 RemoteWalLock::Read(s) => s,
245 RemoteWalLock::Write(s) => s,
246 };
247 write!(f, "{}/{}", REMOTE_WAL_LOCK_PREFIX, key)
248 }
249}
250
251impl From<RemoteWalLock> for StringKey {
252 fn from(value: RemoteWalLock) -> Self {
253 match value {
254 RemoteWalLock::Write(_) => StringKey::Exclusive(value.to_string()),
255 RemoteWalLock::Read(_) => StringKey::Share(value.to_string()),
256 }
257 }
258}
259
260#[cfg(test)]
261mod tests {
262 use common_procedure::StringKey;
263
264 use crate::lock_key::*;
265
266 #[test]
267 fn test_lock_key() {
268 let string_key: StringKey = CatalogLock::Read("foo").into();
270 assert_eq!(
271 string_key,
272 StringKey::Share(format!("{}/{}", CATALOG_LOCK_PREFIX, "foo"))
273 );
274 let string_key: StringKey = CatalogLock::Write("foo").into();
275 assert_eq!(
276 string_key,
277 StringKey::Exclusive(format!("{}/{}", CATALOG_LOCK_PREFIX, "foo"))
278 );
279 let string_key: StringKey = SchemaLock::read("foo", "bar").into();
281 assert_eq!(
282 string_key,
283 StringKey::Share(format!("{}/{}", SCHEMA_LOCK_PREFIX, "foo.bar"))
284 );
285 let string_key: StringKey = SchemaLock::write("foo", "bar").into();
286 assert_eq!(
287 string_key,
288 StringKey::Exclusive(format!("{}/{}", SCHEMA_LOCK_PREFIX, "foo.bar"))
289 );
290 let string_key: StringKey = TableLock::Read(1024).into();
292 assert_eq!(
293 string_key,
294 StringKey::Share(format!("{}/{}", TABLE_LOCK_PREFIX, 1024))
295 );
296 let string_key: StringKey = TableLock::Write(1024).into();
297 assert_eq!(
298 string_key,
299 StringKey::Exclusive(format!("{}/{}", TABLE_LOCK_PREFIX, 1024))
300 );
301 let string_key: StringKey = TableNameLock::new("foo", "bar", "baz").into();
303 assert_eq!(
304 string_key,
305 StringKey::Exclusive(format!("{}/{}", TABLE_NAME_LOCK_PREFIX, "foo.bar.baz"))
306 );
307 let string_key: StringKey = FlowNameLock::new("foo", "baz").into();
309 assert_eq!(
310 string_key,
311 StringKey::Exclusive(format!("{}/{}", FLOW_NAME_LOCK_PREFIX, "foo.baz"))
312 );
313 let region_id = RegionId::new(1024, 1);
315 let string_key: StringKey = RegionLock::Read(region_id).into();
316 assert_eq!(
317 string_key,
318 StringKey::Share(format!("{}/{}", REGION_LOCK_PREFIX, region_id.as_u64()))
319 );
320 let string_key: StringKey = RegionLock::Write(region_id).into();
321 assert_eq!(
322 string_key,
323 StringKey::Exclusive(format!("{}/{}", REGION_LOCK_PREFIX, region_id.as_u64()))
324 );
325 let flow_id = 1024;
327 let string_key: StringKey = FlowLock::Read(flow_id).into();
328 assert_eq!(
329 string_key,
330 StringKey::Share(format!("{}/{}", FLOW_LOCK_PREFIX, flow_id))
331 );
332 let string_key: StringKey = FlowLock::Write(flow_id).into();
333 assert_eq!(
334 string_key,
335 StringKey::Exclusive(format!("{}/{}", FLOW_LOCK_PREFIX, flow_id))
336 );
337 let string_key: StringKey = RemoteWalLock::Read("foo".to_string()).into();
339 assert_eq!(
340 string_key,
341 StringKey::Share(format!("{}/{}", REMOTE_WAL_LOCK_PREFIX, "foo"))
342 );
343 let string_key: StringKey = RemoteWalLock::Write("foo".to_string()).into();
344 assert_eq!(
345 string_key,
346 StringKey::Exclusive(format!("{}/{}", REMOTE_WAL_LOCK_PREFIX, "foo"))
347 );
348 }
349}