1use std::collections::HashMap;
16use std::fmt;
17
18use common_telemetry::{debug, error, info, warn};
19use futures::TryStreamExt;
20use serde::{Deserialize, Serialize};
21use snafu::ResultExt;
22
23use crate::error::{Result, ToJsonSnafu};
24pub(crate) use crate::store::state_store::StateStoreRef;
25use crate::ProcedureId;
26
27pub mod poison_store;
28pub mod state_store;
29pub mod util;
30
31const PROC_PATH: &str = "procedure/";
33
34macro_rules! proc_path {
36 ($store: expr, $fmt:expr) => { format!("{}{}", $store.proc_path(), format_args!($fmt)) };
37 ($store: expr, $fmt:expr, $($args:tt)*) => { format!("{}{}", $store.proc_path(), format_args!($fmt, $($args)*)) };
38}
39
40pub(crate) use proc_path;
41
42#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
44pub struct ProcedureMessage {
45 pub type_name: String,
48 pub data: String,
50 pub parent_id: Option<ProcedureId>,
52 pub step: u32,
54 #[serde(default, skip_serializing_if = "Option::is_none")]
56 pub error: Option<String>,
57}
58
59#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
61pub struct ProcedureMessages {
62 pub messages: HashMap<ProcedureId, ProcedureMessage>,
64 pub rollback_messages: HashMap<ProcedureId, ProcedureMessage>,
66 pub finished_ids: Vec<ProcedureId>,
68}
69
70pub(crate) struct ProcedureStore {
72 proc_path: String,
73 store: StateStoreRef,
74}
75
76impl ProcedureStore {
77 pub(crate) fn new(parent_path: &str, store: StateStoreRef) -> ProcedureStore {
79 let proc_path = format!("{}{PROC_PATH}", parent_path);
80 info!("The procedure state store path is: {}", &proc_path);
81 ProcedureStore { proc_path, store }
82 }
83
84 #[inline]
85 pub(crate) fn proc_path(&self) -> &str {
86 &self.proc_path
87 }
88
89 pub(crate) async fn store_procedure(
91 &self,
92 procedure_id: ProcedureId,
93 step: u32,
94 type_name: String,
95 data: String,
96 parent_id: Option<ProcedureId>,
97 ) -> Result<()> {
98 let message = ProcedureMessage {
99 type_name,
100 data,
101 parent_id,
102 step,
103 error: None,
104 };
105 let key = ParsedKey {
106 prefix: &self.proc_path,
107 procedure_id,
108 step,
109 key_type: KeyType::Step,
110 }
111 .to_string();
112 let value = serde_json::to_string(&message).context(ToJsonSnafu)?;
113
114 self.store.put(&key, value.into_bytes()).await?;
115
116 Ok(())
117 }
118
119 pub(crate) async fn commit_procedure(
121 &self,
122 procedure_id: ProcedureId,
123 step: u32,
124 ) -> Result<()> {
125 let key = ParsedKey {
126 prefix: &self.proc_path,
127 procedure_id,
128 step,
129 key_type: KeyType::Commit,
130 }
131 .to_string();
132 self.store.put(&key, Vec::new()).await?;
133
134 Ok(())
135 }
136
137 pub(crate) async fn rollback_procedure(
139 &self,
140 procedure_id: ProcedureId,
141 message: ProcedureMessage,
142 ) -> Result<()> {
143 let key = ParsedKey {
144 prefix: &self.proc_path,
145 procedure_id,
146 step: message.step,
147 key_type: KeyType::Rollback,
148 }
149 .to_string();
150
151 self.store
152 .put(&key, serde_json::to_vec(&message).context(ToJsonSnafu)?)
153 .await?;
154
155 Ok(())
156 }
157
158 pub(crate) async fn delete_procedure(&self, procedure_id: ProcedureId) -> Result<()> {
160 let path = proc_path!(self, "{procedure_id}/");
161 let mut key_values = self.store.walk_top_down(&path).await?;
163 let mut step_keys = Vec::with_capacity(8);
165 let mut finish_keys = Vec::new();
166 while let Some((key_set, _)) = key_values.try_next().await? {
167 let key = key_set.key();
168 let Some(curr_key) = ParsedKey::parse_str(&self.proc_path, key) else {
169 warn!("Unknown key while deleting procedures, key: {}", key);
170 continue;
171 };
172 if curr_key.key_type == KeyType::Step {
173 step_keys.extend(key_set.keys());
174 } else {
175 finish_keys.extend(key_set.keys());
177 }
178 }
179
180 debug!(
181 "Delete keys for procedure {}, step_keys: {:?}, finish_keys: {:?}",
182 procedure_id, step_keys, finish_keys
183 );
184 self.store.batch_delete(step_keys.as_slice()).await?;
186 self.store.batch_delete(finish_keys.as_slice()).await?;
188 self.store.delete(&path).await?;
190 Ok(())
194 }
195
196 pub(crate) async fn load_messages(&self) -> Result<ProcedureMessages> {
198 let mut procedure_key_values: HashMap<_, (ParsedKey, Vec<u8>)> = HashMap::new();
200
201 let mut key_values = self.store.walk_top_down(&self.proc_path).await?;
203 while let Some((key_set, value)) = key_values.try_next().await? {
204 let key = key_set.key();
205 let Some(curr_key) = ParsedKey::parse_str(&self.proc_path, key) else {
206 warn!("Unknown key while loading procedures, key: {}", key);
207 continue;
208 };
209
210 if let Some(entry) = procedure_key_values.get_mut(&curr_key.procedure_id) {
211 if entry.0.step < curr_key.step {
212 entry.0 = curr_key;
213 entry.1 = value;
214 }
215 } else {
216 let _ = procedure_key_values.insert(curr_key.procedure_id, (curr_key, value));
217 }
218 }
219
220 let mut messages = HashMap::with_capacity(procedure_key_values.len());
221 let mut rollback_messages = HashMap::new();
222 let mut finished_ids = Vec::new();
223 for (procedure_id, (parsed_key, value)) in procedure_key_values {
224 match parsed_key.key_type {
225 KeyType::Step => {
226 let Some(message) = self.load_one_message(&parsed_key, &value) else {
227 continue;
230 };
231 let _ = messages.insert(procedure_id, message);
232 }
233 KeyType::Commit => {
234 finished_ids.push(procedure_id);
235 }
236 KeyType::Rollback => {
237 let Some(message) = self.load_one_message(&parsed_key, &value) else {
238 continue;
241 };
242 let _ = rollback_messages.insert(procedure_id, message);
243 }
244 }
245 }
246
247 Ok(ProcedureMessages {
248 messages,
249 rollback_messages,
250 finished_ids,
251 })
252 }
253
254 fn load_one_message(&self, key: &ParsedKey, value: &[u8]) -> Option<ProcedureMessage> {
255 serde_json::from_slice(value)
256 .map_err(|e| {
257 error!("Failed to parse value, key: {:?}, source: {:?}", key, e);
259 e
260 })
261 .ok()
262 }
263}
264
265#[derive(Debug, PartialEq, Eq)]
267enum KeyType {
268 Step,
269 Commit,
270 Rollback,
271}
272
273impl KeyType {
274 fn as_str(&self) -> &'static str {
275 match self {
276 KeyType::Step => "step",
277 KeyType::Commit => "commit",
278 KeyType::Rollback => "rollback",
279 }
280 }
281
282 fn from_str(s: &str) -> Option<KeyType> {
283 match s {
284 "step" => Some(KeyType::Step),
285 "commit" => Some(KeyType::Commit),
286 "rollback" => Some(KeyType::Rollback),
287 _ => None,
288 }
289 }
290}
291
292#[derive(Debug, PartialEq, Eq)]
294struct ParsedKey<'a> {
295 prefix: &'a str,
296 procedure_id: ProcedureId,
297 step: u32,
298 key_type: KeyType,
299}
300
301impl fmt::Display for ParsedKey<'_> {
302 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
303 write!(
304 f,
305 "{}{}/{:010}.{}",
306 self.prefix,
307 self.procedure_id,
308 self.step,
309 self.key_type.as_str(),
310 )
311 }
312}
313
314impl<'a> ParsedKey<'a> {
315 fn parse_str(prefix: &'a str, input: &str) -> Option<ParsedKey<'a>> {
317 let input = input.strip_prefix(prefix)?;
318 let mut iter = input.rsplit('/');
319 let name = iter.next()?;
320 let id_str = iter.next()?;
321
322 let procedure_id = ProcedureId::parse_str(id_str).ok()?;
323
324 let mut parts = name.split('.');
325 let step_str = parts.next()?;
326 let suffix = parts.next()?;
327 let key_type = KeyType::from_str(suffix)?;
328 let step = step_str.parse().ok()?;
329
330 Some(ParsedKey {
331 prefix,
332 procedure_id,
333 step,
334 key_type,
335 })
336 }
337}
338
339#[cfg(test)]
340mod tests {
341 use std::sync::Arc;
342
343 use object_store::ObjectStore;
344
345 use crate::procedure::PoisonKeys;
346 use crate::store::state_store::ObjectStateStore;
347 use crate::BoxedProcedure;
348
349 impl ProcedureStore {
350 pub(crate) fn from_object_store(store: ObjectStore) -> ProcedureStore {
351 let state_store = ObjectStateStore::new(store);
352
353 ProcedureStore::new("data/", Arc::new(state_store))
354 }
355 }
356
357 use async_trait::async_trait;
358 use common_test_util::temp_dir::{create_temp_dir, TempDir};
359 use object_store::services::Fs as Builder;
360
361 use super::*;
362 use crate::{Context, LockKey, Procedure, Status};
363
364 fn procedure_store_for_test(dir: &TempDir) -> ProcedureStore {
365 let store_dir = dir.path().to_str().unwrap();
366 let builder = Builder::default().root(store_dir);
367 let object_store = ObjectStore::new(builder).unwrap().finish();
368
369 ProcedureStore::from_object_store(object_store)
370 }
371
372 #[test]
373 fn test_parsed_key() {
374 let dir = create_temp_dir("store_procedure");
375 let store = procedure_store_for_test(&dir);
376
377 let procedure_id = ProcedureId::random();
378 let key = ParsedKey {
379 prefix: &store.proc_path,
380 procedure_id,
381 step: 2,
382 key_type: KeyType::Step,
383 };
384 assert_eq!(
385 proc_path!(store, "{procedure_id}/0000000002.step"),
386 key.to_string()
387 );
388 assert_eq!(
389 key,
390 ParsedKey::parse_str(&store.proc_path, &key.to_string()).unwrap()
391 );
392
393 let key = ParsedKey {
394 prefix: &store.proc_path,
395 procedure_id,
396 step: 2,
397 key_type: KeyType::Commit,
398 };
399 assert_eq!(
400 proc_path!(store, "{procedure_id}/0000000002.commit"),
401 key.to_string()
402 );
403 assert_eq!(
404 key,
405 ParsedKey::parse_str(&store.proc_path, &key.to_string()).unwrap()
406 );
407
408 let key = ParsedKey {
409 prefix: &store.proc_path,
410 procedure_id,
411 step: 2,
412 key_type: KeyType::Rollback,
413 };
414 assert_eq!(
415 proc_path!(store, "{procedure_id}/0000000002.rollback"),
416 key.to_string()
417 );
418 assert_eq!(
419 key,
420 ParsedKey::parse_str(&store.proc_path, &key.to_string()).unwrap()
421 );
422 }
423
424 #[test]
425 fn test_parse_invalid_key() {
426 let dir = create_temp_dir("store_procedure");
427 let store = procedure_store_for_test(&dir);
428
429 assert!(ParsedKey::parse_str(&store.proc_path, "").is_none());
430 assert!(ParsedKey::parse_str(&store.proc_path, "invalidprefix").is_none());
431 assert!(ParsedKey::parse_str(&store.proc_path, "procedu/0000000003.step").is_none());
432 assert!(ParsedKey::parse_str(&store.proc_path, "procedure-0000000003.step").is_none());
433
434 let procedure_id = ProcedureId::random();
435 let input = proc_path!(store, "{procedure_id}");
436 assert!(ParsedKey::parse_str(&store.proc_path, &input).is_none());
437
438 let input = proc_path!(store, "{procedure_id}");
439 assert!(ParsedKey::parse_str(&store.proc_path, &input).is_none());
440
441 let input = proc_path!(store, "{procedure_id}/0000000003");
442 assert!(ParsedKey::parse_str(&store.proc_path, &input).is_none());
443
444 let input = proc_path!(store, "{procedure_id}/0000000003.");
445 assert!(ParsedKey::parse_str(&store.proc_path, &input).is_none());
446
447 let input = proc_path!(store, "{procedure_id}/0000000003.other");
448 assert!(ParsedKey::parse_str(&store.proc_path, &input).is_none());
449
450 assert!(ParsedKey::parse_str(&store.proc_path, "12345/0000000003.step").is_none());
451
452 let input = proc_path!(store, "{procedure_id}-0000000003.commit");
453 assert!(ParsedKey::parse_str(&store.proc_path, &input).is_none());
454 }
455
456 #[test]
457 fn test_procedure_message() {
458 let mut message = ProcedureMessage {
459 type_name: "TestMessage".to_string(),
460 data: "no parent id".to_string(),
461 parent_id: None,
462 step: 4,
463 error: None,
464 };
465
466 let json = serde_json::to_string(&message).unwrap();
467 assert_eq!(
468 json,
469 r#"{"type_name":"TestMessage","data":"no parent id","parent_id":null,"step":4}"#
470 );
471
472 let procedure_id = ProcedureId::parse_str("9f805a1f-05f7-490c-9f91-bd56e3cc54c1").unwrap();
473 message.parent_id = Some(procedure_id);
474 let json = serde_json::to_string(&message).unwrap();
475 assert_eq!(
476 json,
477 r#"{"type_name":"TestMessage","data":"no parent id","parent_id":"9f805a1f-05f7-490c-9f91-bd56e3cc54c1","step":4}"#
478 );
479 }
480
481 struct MockProcedure {
482 data: String,
483 }
484
485 impl MockProcedure {
486 fn new(data: impl Into<String>) -> MockProcedure {
487 MockProcedure { data: data.into() }
488 }
489 }
490
491 #[async_trait]
492 impl Procedure for MockProcedure {
493 fn type_name(&self) -> &str {
494 "MockProcedure"
495 }
496
497 async fn execute(&mut self, _ctx: &Context) -> Result<Status> {
498 unimplemented!()
499 }
500
501 fn dump(&self) -> Result<String> {
502 Ok(self.data.clone())
503 }
504
505 fn lock_key(&self) -> LockKey {
506 LockKey::default()
507 }
508
509 fn poison_keys(&self) -> PoisonKeys {
510 PoisonKeys::default()
511 }
512 }
513
514 #[tokio::test]
515 async fn test_store_procedure() {
516 let dir = create_temp_dir("store_procedure");
517 let store = procedure_store_for_test(&dir);
518
519 let procedure_id = ProcedureId::random();
520 let procedure: BoxedProcedure = Box::new(MockProcedure::new("test store procedure"));
521 let type_name = procedure.type_name().to_string();
522 let data = procedure.dump().unwrap();
523 store
524 .store_procedure(procedure_id, 0, type_name, data, None)
525 .await
526 .unwrap();
527
528 let ProcedureMessages {
529 messages,
530 rollback_messages,
531 finished_ids,
532 } = store.load_messages().await.unwrap();
533 assert_eq!(1, messages.len());
534 assert!(rollback_messages.is_empty());
535 assert!(finished_ids.is_empty());
536 let msg = messages.get(&procedure_id).unwrap();
537 let expect = ProcedureMessage {
538 type_name: "MockProcedure".to_string(),
539 data: "test store procedure".to_string(),
540 parent_id: None,
541 step: 0,
542 error: None,
543 };
544 assert_eq!(expect, *msg);
545 }
546
547 #[tokio::test]
548 async fn test_commit_procedure() {
549 let dir = create_temp_dir("commit_procedure");
550 let store = procedure_store_for_test(&dir);
551
552 let procedure_id = ProcedureId::random();
553 let procedure: BoxedProcedure = Box::new(MockProcedure::new("test store procedure"));
554 let type_name = procedure.type_name().to_string();
555 let data = procedure.dump().unwrap();
556 store
557 .store_procedure(procedure_id, 0, type_name, data, None)
558 .await
559 .unwrap();
560 store.commit_procedure(procedure_id, 1).await.unwrap();
561
562 let ProcedureMessages {
563 messages,
564 rollback_messages,
565 finished_ids,
566 } = store.load_messages().await.unwrap();
567 assert!(messages.is_empty());
568 assert!(rollback_messages.is_empty());
569 assert_eq!(&[procedure_id], &finished_ids[..]);
570 }
571
572 #[tokio::test]
573 async fn test_rollback_procedure() {
574 let dir = create_temp_dir("rollback_procedure");
575 let store = procedure_store_for_test(&dir);
576
577 let procedure_id = ProcedureId::random();
578 let procedure: BoxedProcedure = Box::new(MockProcedure::new("test store procedure"));
579 let type_name = procedure.type_name().to_string();
580 let data = procedure.dump().unwrap();
581 store
582 .store_procedure(
583 procedure_id,
584 0,
585 type_name.to_string(),
586 data.to_string(),
587 None,
588 )
589 .await
590 .unwrap();
591 let message = ProcedureMessage {
592 type_name,
593 data,
594 parent_id: None,
595 step: 1,
596 error: None,
597 };
598 store
599 .rollback_procedure(procedure_id, message)
600 .await
601 .unwrap();
602
603 let ProcedureMessages {
604 messages,
605 rollback_messages,
606 finished_ids,
607 } = store.load_messages().await.unwrap();
608 assert!(messages.is_empty());
609 assert_eq!(1, rollback_messages.len());
610 assert!(finished_ids.is_empty());
611 assert!(rollback_messages.contains_key(&procedure_id));
612 }
613
614 #[tokio::test]
615 async fn test_delete_procedure() {
616 let dir = create_temp_dir("delete_procedure");
617 let store = procedure_store_for_test(&dir);
618
619 let procedure_id = ProcedureId::random();
620 let procedure: BoxedProcedure = Box::new(MockProcedure::new("test store procedure"));
621 let type_name = procedure.type_name().to_string();
622 let data = procedure.dump().unwrap();
623 store
624 .store_procedure(procedure_id, 0, type_name, data, None)
625 .await
626 .unwrap();
627 let type_name = procedure.type_name().to_string();
628 let data = procedure.dump().unwrap();
629 store
630 .store_procedure(procedure_id, 1, type_name, data, None)
631 .await
632 .unwrap();
633
634 store.delete_procedure(procedure_id).await.unwrap();
635
636 let ProcedureMessages {
637 messages,
638 rollback_messages,
639 finished_ids,
640 } = store.load_messages().await.unwrap();
641 assert!(messages.is_empty());
642 assert!(rollback_messages.is_empty());
643 assert!(finished_ids.is_empty());
644 }
645
646 #[tokio::test]
647 async fn test_delete_committed_procedure() {
648 let dir = create_temp_dir("delete_committed");
649 let store = procedure_store_for_test(&dir);
650
651 let procedure_id = ProcedureId::random();
652 let procedure: BoxedProcedure = Box::new(MockProcedure::new("test store procedure"));
653
654 let type_name = procedure.type_name().to_string();
655 let data = procedure.dump().unwrap();
656 store
657 .store_procedure(procedure_id, 0, type_name, data, None)
658 .await
659 .unwrap();
660
661 let type_name = procedure.type_name().to_string();
662 let data = procedure.dump().unwrap();
663 store
664 .store_procedure(procedure_id, 1, type_name, data, None)
665 .await
666 .unwrap();
667 store.commit_procedure(procedure_id, 2).await.unwrap();
668
669 store.delete_procedure(procedure_id).await.unwrap();
670
671 let ProcedureMessages {
672 messages,
673 rollback_messages,
674 finished_ids,
675 } = store.load_messages().await.unwrap();
676 assert!(messages.is_empty());
677 assert!(rollback_messages.is_empty());
678 assert!(finished_ids.is_empty());
679 }
680
681 #[tokio::test]
682 async fn test_load_messages() {
683 let dir = create_temp_dir("load_messages");
684 let store = procedure_store_for_test(&dir);
685
686 let id0 = ProcedureId::random();
688 let procedure: BoxedProcedure = Box::new(MockProcedure::new("id0-0"));
689 let type_name = procedure.type_name().to_string();
690 let data = procedure.dump().unwrap();
691 store
692 .store_procedure(id0, 0, type_name, data, None)
693 .await
694 .unwrap();
695 let procedure: BoxedProcedure = Box::new(MockProcedure::new("id0-1"));
696 let type_name = procedure.type_name().to_string();
697 let data = procedure.dump().unwrap();
698 store
699 .store_procedure(id0, 1, type_name, data, None)
700 .await
701 .unwrap();
702 let procedure: BoxedProcedure = Box::new(MockProcedure::new("id0-2"));
703 let type_name = procedure.type_name().to_string();
704 let data = procedure.dump().unwrap();
705 store
706 .store_procedure(id0, 2, type_name, data, None)
707 .await
708 .unwrap();
709
710 let id1 = ProcedureId::random();
712 let procedure: BoxedProcedure = Box::new(MockProcedure::new("id1-0"));
713 let type_name = procedure.type_name().to_string();
714 let data = procedure.dump().unwrap();
715 store
716 .store_procedure(id1, 0, type_name, data, None)
717 .await
718 .unwrap();
719 let procedure: BoxedProcedure = Box::new(MockProcedure::new("id1-1"));
720 let type_name = procedure.type_name().to_string();
721 let data = procedure.dump().unwrap();
722 store
723 .store_procedure(id1, 1, type_name, data, None)
724 .await
725 .unwrap();
726 store.commit_procedure(id1, 2).await.unwrap();
727
728 let id2 = ProcedureId::random();
730 let procedure: BoxedProcedure = Box::new(MockProcedure::new("id2-0"));
731 let type_name = procedure.type_name().to_string();
732 let data = procedure.dump().unwrap();
733 store
734 .store_procedure(id2, 0, type_name, data, None)
735 .await
736 .unwrap();
737
738 let ProcedureMessages {
739 messages,
740 rollback_messages,
741 finished_ids,
742 } = store.load_messages().await.unwrap();
743 assert_eq!(2, messages.len());
744 assert!(rollback_messages.is_empty());
745 assert_eq!(1, finished_ids.len());
746
747 let msg = messages.get(&id0).unwrap();
748 assert_eq!("id0-2", msg.data);
749 let msg = messages.get(&id2).unwrap();
750 assert_eq!("id2-0", msg.data);
751 }
752}