1use std::collections::HashMap;
16use std::fmt;
17
18use common_telemetry::{debug, error, info, warn};
19use futures::TryStreamExt;
20use serde::{Deserialize, Serialize};
21use snafu::ResultExt;
22
23use crate::ProcedureId;
24use crate::error::{Result, ToJsonSnafu};
25pub(crate) use crate::store::state_store::StateStoreRef;
26
27pub mod poison_store;
28pub mod state_store;
29pub mod util;
30
31const PROC_PATH: &str = "procedure/";
33
34macro_rules! proc_path {
36 ($store: expr, $fmt:expr) => { format!("{}{}", $store.proc_path(), format_args!($fmt)) };
37 ($store: expr, $fmt:expr, $($args:tt)*) => { format!("{}{}", $store.proc_path(), format_args!($fmt, $($args)*)) };
38}
39
40#[cfg(test)]
41pub(crate) use proc_path;
42
43#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
45pub struct ProcedureMessage {
46 pub type_name: String,
49 pub data: String,
51 pub parent_id: Option<ProcedureId>,
53 pub step: u32,
55 #[serde(default, skip_serializing_if = "Option::is_none")]
57 pub error: Option<String>,
58}
59
60#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
62pub struct ProcedureMessages {
63 pub messages: HashMap<ProcedureId, ProcedureMessage>,
65 pub rollback_messages: HashMap<ProcedureId, ProcedureMessage>,
67 pub finished_ids: Vec<ProcedureId>,
69}
70
71pub(crate) struct ProcedureStore {
73 proc_path: String,
74 store: StateStoreRef,
75}
76
77impl ProcedureStore {
78 pub(crate) fn new(parent_path: &str, store: StateStoreRef) -> ProcedureStore {
80 let proc_path = format!("{}{PROC_PATH}", parent_path);
81 info!("The procedure state store path is: {}", &proc_path);
82 ProcedureStore { proc_path, store }
83 }
84
85 #[inline]
86 pub(crate) fn proc_path(&self) -> &str {
87 &self.proc_path
88 }
89
90 pub(crate) async fn store_procedure(
92 &self,
93 procedure_id: ProcedureId,
94 step: u32,
95 type_name: String,
96 data: String,
97 parent_id: Option<ProcedureId>,
98 ) -> Result<()> {
99 let message = ProcedureMessage {
100 type_name,
101 data,
102 parent_id,
103 step,
104 error: None,
105 };
106 let key = ParsedKey {
107 prefix: &self.proc_path,
108 procedure_id,
109 step,
110 key_type: KeyType::Step,
111 }
112 .to_string();
113 let value = serde_json::to_string(&message).context(ToJsonSnafu)?;
114
115 self.store.put(&key, value.into_bytes()).await?;
116
117 Ok(())
118 }
119
120 pub(crate) async fn commit_procedure(
122 &self,
123 procedure_id: ProcedureId,
124 step: u32,
125 ) -> Result<()> {
126 let key = ParsedKey {
127 prefix: &self.proc_path,
128 procedure_id,
129 step,
130 key_type: KeyType::Commit,
131 }
132 .to_string();
133 self.store.put(&key, Vec::new()).await?;
134
135 Ok(())
136 }
137
138 pub(crate) async fn rollback_procedure(
140 &self,
141 procedure_id: ProcedureId,
142 message: ProcedureMessage,
143 ) -> Result<()> {
144 let key = ParsedKey {
145 prefix: &self.proc_path,
146 procedure_id,
147 step: message.step,
148 key_type: KeyType::Rollback,
149 }
150 .to_string();
151
152 self.store
153 .put(&key, serde_json::to_vec(&message).context(ToJsonSnafu)?)
154 .await?;
155
156 Ok(())
157 }
158
159 pub(crate) async fn delete_procedure(&self, procedure_id: ProcedureId) -> Result<()> {
161 let path = proc_path!(self, "{procedure_id}/");
162 let mut key_values = self.store.walk_top_down(&path).await?;
164 let mut step_keys = Vec::with_capacity(8);
166 let mut finish_keys = Vec::new();
167 while let Some((key_set, _)) = key_values.try_next().await? {
168 let key = key_set.key();
169 let Some(curr_key) = ParsedKey::parse_str(&self.proc_path, key) else {
170 warn!("Unknown key while deleting procedures, key: {}", key);
171 continue;
172 };
173 if curr_key.key_type == KeyType::Step {
174 step_keys.extend(key_set.keys());
175 } else {
176 finish_keys.extend(key_set.keys());
178 }
179 }
180
181 debug!(
182 "Delete keys for procedure {}, step_keys: {:?}, finish_keys: {:?}",
183 procedure_id, step_keys, finish_keys
184 );
185 self.store.batch_delete(step_keys.as_slice()).await?;
187 self.store.batch_delete(finish_keys.as_slice()).await?;
189 self.store.delete(&path).await?;
191 Ok(())
195 }
196
197 pub(crate) async fn load_messages(&self) -> Result<ProcedureMessages> {
199 let mut procedure_key_values: HashMap<_, (ParsedKey, Vec<u8>)> = HashMap::new();
201
202 let mut key_values = self.store.walk_top_down(&self.proc_path).await?;
204 while let Some((key_set, value)) = key_values.try_next().await? {
205 let key = key_set.key();
206 let Some(curr_key) = ParsedKey::parse_str(&self.proc_path, key) else {
207 warn!("Unknown key while loading procedures, key: {}", key);
208 continue;
209 };
210
211 if let Some(entry) = procedure_key_values.get_mut(&curr_key.procedure_id) {
212 if entry.0.step < curr_key.step {
213 entry.0 = curr_key;
214 entry.1 = value;
215 }
216 } else {
217 let _ = procedure_key_values.insert(curr_key.procedure_id, (curr_key, value));
218 }
219 }
220
221 let mut messages = HashMap::with_capacity(procedure_key_values.len());
222 let mut rollback_messages = HashMap::new();
223 let mut finished_ids = Vec::new();
224 for (procedure_id, (parsed_key, value)) in procedure_key_values {
225 match parsed_key.key_type {
226 KeyType::Step => {
227 let Some(message) = self.load_one_message(&parsed_key, &value) else {
228 continue;
231 };
232 let _ = messages.insert(procedure_id, message);
233 }
234 KeyType::Commit => {
235 finished_ids.push(procedure_id);
236 }
237 KeyType::Rollback => {
238 let Some(message) = self.load_one_message(&parsed_key, &value) else {
239 continue;
242 };
243 let _ = rollback_messages.insert(procedure_id, message);
244 }
245 }
246 }
247
248 Ok(ProcedureMessages {
249 messages,
250 rollback_messages,
251 finished_ids,
252 })
253 }
254
255 fn load_one_message(&self, key: &ParsedKey, value: &[u8]) -> Option<ProcedureMessage> {
256 serde_json::from_slice(value)
257 .map_err(|e| {
258 error!("Failed to parse value, key: {:?}, source: {:?}", key, e);
260 e
261 })
262 .ok()
263 }
264}
265
266#[derive(Debug, PartialEq, Eq)]
268enum KeyType {
269 Step,
270 Commit,
271 Rollback,
272}
273
274impl KeyType {
275 fn as_str(&self) -> &'static str {
276 match self {
277 KeyType::Step => "step",
278 KeyType::Commit => "commit",
279 KeyType::Rollback => "rollback",
280 }
281 }
282
283 fn from_str(s: &str) -> Option<KeyType> {
284 match s {
285 "step" => Some(KeyType::Step),
286 "commit" => Some(KeyType::Commit),
287 "rollback" => Some(KeyType::Rollback),
288 _ => None,
289 }
290 }
291}
292
293#[derive(Debug, PartialEq, Eq)]
295struct ParsedKey<'a> {
296 prefix: &'a str,
297 procedure_id: ProcedureId,
298 step: u32,
299 key_type: KeyType,
300}
301
302impl fmt::Display for ParsedKey<'_> {
303 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
304 write!(
305 f,
306 "{}{}/{:010}.{}",
307 self.prefix,
308 self.procedure_id,
309 self.step,
310 self.key_type.as_str(),
311 )
312 }
313}
314
315impl<'a> ParsedKey<'a> {
316 fn parse_str(prefix: &'a str, input: &str) -> Option<ParsedKey<'a>> {
318 let input = input.strip_prefix(prefix)?;
319 let mut iter = input.rsplit('/');
320 let name = iter.next()?;
321 let id_str = iter.next()?;
322
323 let procedure_id = ProcedureId::parse_str(id_str).ok()?;
324
325 let mut parts = name.split('.');
326 let step_str = parts.next()?;
327 let suffix = parts.next()?;
328 let key_type = KeyType::from_str(suffix)?;
329 let step = step_str.parse().ok()?;
330
331 Some(ParsedKey {
332 prefix,
333 procedure_id,
334 step,
335 key_type,
336 })
337 }
338}
339
340#[cfg(test)]
341mod tests {
342 use std::sync::Arc;
343
344 use object_store::ObjectStore;
345
346 use crate::BoxedProcedure;
347 use crate::procedure::PoisonKeys;
348 use crate::store::state_store::ObjectStateStore;
349
350 impl ProcedureStore {
351 pub(crate) fn from_object_store(store: ObjectStore) -> ProcedureStore {
352 let state_store = ObjectStateStore::new(store);
353
354 ProcedureStore::new("data/", Arc::new(state_store))
355 }
356 }
357
358 use async_trait::async_trait;
359 use common_test_util::temp_dir::{TempDir, create_temp_dir};
360 use object_store::services::Fs as Builder;
361
362 use super::*;
363 use crate::{Context, LockKey, Procedure, Status};
364
365 fn procedure_store_for_test(dir: &TempDir) -> ProcedureStore {
366 let store_dir = dir.path().to_str().unwrap();
367 let builder = Builder::default().root(store_dir);
368 let object_store = ObjectStore::new(builder).unwrap().finish();
369
370 ProcedureStore::from_object_store(object_store)
371 }
372
373 #[test]
374 fn test_parsed_key() {
375 let dir = create_temp_dir("store_procedure");
376 let store = procedure_store_for_test(&dir);
377
378 let procedure_id = ProcedureId::random();
379 let key = ParsedKey {
380 prefix: &store.proc_path,
381 procedure_id,
382 step: 2,
383 key_type: KeyType::Step,
384 };
385 assert_eq!(
386 proc_path!(store, "{procedure_id}/0000000002.step"),
387 key.to_string()
388 );
389 assert_eq!(
390 key,
391 ParsedKey::parse_str(&store.proc_path, &key.to_string()).unwrap()
392 );
393
394 let key = ParsedKey {
395 prefix: &store.proc_path,
396 procedure_id,
397 step: 2,
398 key_type: KeyType::Commit,
399 };
400 assert_eq!(
401 proc_path!(store, "{procedure_id}/0000000002.commit"),
402 key.to_string()
403 );
404 assert_eq!(
405 key,
406 ParsedKey::parse_str(&store.proc_path, &key.to_string()).unwrap()
407 );
408
409 let key = ParsedKey {
410 prefix: &store.proc_path,
411 procedure_id,
412 step: 2,
413 key_type: KeyType::Rollback,
414 };
415 assert_eq!(
416 proc_path!(store, "{procedure_id}/0000000002.rollback"),
417 key.to_string()
418 );
419 assert_eq!(
420 key,
421 ParsedKey::parse_str(&store.proc_path, &key.to_string()).unwrap()
422 );
423 }
424
425 #[test]
426 fn test_parse_invalid_key() {
427 let dir = create_temp_dir("store_procedure");
428 let store = procedure_store_for_test(&dir);
429
430 assert!(ParsedKey::parse_str(&store.proc_path, "").is_none());
431 assert!(ParsedKey::parse_str(&store.proc_path, "invalidprefix").is_none());
432 assert!(ParsedKey::parse_str(&store.proc_path, "procedu/0000000003.step").is_none());
433 assert!(ParsedKey::parse_str(&store.proc_path, "procedure-0000000003.step").is_none());
434
435 let procedure_id = ProcedureId::random();
436 let input = proc_path!(store, "{procedure_id}");
437 assert!(ParsedKey::parse_str(&store.proc_path, &input).is_none());
438
439 let input = proc_path!(store, "{procedure_id}");
440 assert!(ParsedKey::parse_str(&store.proc_path, &input).is_none());
441
442 let input = proc_path!(store, "{procedure_id}/0000000003");
443 assert!(ParsedKey::parse_str(&store.proc_path, &input).is_none());
444
445 let input = proc_path!(store, "{procedure_id}/0000000003.");
446 assert!(ParsedKey::parse_str(&store.proc_path, &input).is_none());
447
448 let input = proc_path!(store, "{procedure_id}/0000000003.other");
449 assert!(ParsedKey::parse_str(&store.proc_path, &input).is_none());
450
451 assert!(ParsedKey::parse_str(&store.proc_path, "12345/0000000003.step").is_none());
452
453 let input = proc_path!(store, "{procedure_id}-0000000003.commit");
454 assert!(ParsedKey::parse_str(&store.proc_path, &input).is_none());
455 }
456
457 #[test]
458 fn test_procedure_message() {
459 let mut message = ProcedureMessage {
460 type_name: "TestMessage".to_string(),
461 data: "no parent id".to_string(),
462 parent_id: None,
463 step: 4,
464 error: None,
465 };
466
467 let json = serde_json::to_string(&message).unwrap();
468 assert_eq!(
469 json,
470 r#"{"type_name":"TestMessage","data":"no parent id","parent_id":null,"step":4}"#
471 );
472
473 let procedure_id = ProcedureId::parse_str("9f805a1f-05f7-490c-9f91-bd56e3cc54c1").unwrap();
474 message.parent_id = Some(procedure_id);
475 let json = serde_json::to_string(&message).unwrap();
476 assert_eq!(
477 json,
478 r#"{"type_name":"TestMessage","data":"no parent id","parent_id":"9f805a1f-05f7-490c-9f91-bd56e3cc54c1","step":4}"#
479 );
480 }
481
482 struct MockProcedure {
483 data: String,
484 }
485
486 impl MockProcedure {
487 fn new(data: impl Into<String>) -> MockProcedure {
488 MockProcedure { data: data.into() }
489 }
490 }
491
492 #[async_trait]
493 impl Procedure for MockProcedure {
494 fn type_name(&self) -> &str {
495 "MockProcedure"
496 }
497
498 async fn execute(&mut self, _ctx: &Context) -> Result<Status> {
499 unimplemented!()
500 }
501
502 fn dump(&self) -> Result<String> {
503 Ok(self.data.clone())
504 }
505
506 fn lock_key(&self) -> LockKey {
507 LockKey::default()
508 }
509
510 fn poison_keys(&self) -> PoisonKeys {
511 PoisonKeys::default()
512 }
513 }
514
515 #[tokio::test]
516 async fn test_store_procedure() {
517 let dir = create_temp_dir("store_procedure");
518 let store = procedure_store_for_test(&dir);
519
520 let procedure_id = ProcedureId::random();
521 let procedure: BoxedProcedure = Box::new(MockProcedure::new("test store procedure"));
522 let type_name = procedure.type_name().to_string();
523 let data = procedure.dump().unwrap();
524 store
525 .store_procedure(procedure_id, 0, type_name, data, None)
526 .await
527 .unwrap();
528
529 let ProcedureMessages {
530 messages,
531 rollback_messages,
532 finished_ids,
533 } = store.load_messages().await.unwrap();
534 assert_eq!(1, messages.len());
535 assert!(rollback_messages.is_empty());
536 assert!(finished_ids.is_empty());
537 let msg = messages.get(&procedure_id).unwrap();
538 let expect = ProcedureMessage {
539 type_name: "MockProcedure".to_string(),
540 data: "test store procedure".to_string(),
541 parent_id: None,
542 step: 0,
543 error: None,
544 };
545 assert_eq!(expect, *msg);
546 }
547
548 #[tokio::test]
549 async fn test_commit_procedure() {
550 let dir = create_temp_dir("commit_procedure");
551 let store = procedure_store_for_test(&dir);
552
553 let procedure_id = ProcedureId::random();
554 let procedure: BoxedProcedure = Box::new(MockProcedure::new("test store procedure"));
555 let type_name = procedure.type_name().to_string();
556 let data = procedure.dump().unwrap();
557 store
558 .store_procedure(procedure_id, 0, type_name, data, None)
559 .await
560 .unwrap();
561 store.commit_procedure(procedure_id, 1).await.unwrap();
562
563 let ProcedureMessages {
564 messages,
565 rollback_messages,
566 finished_ids,
567 } = store.load_messages().await.unwrap();
568 assert!(messages.is_empty());
569 assert!(rollback_messages.is_empty());
570 assert_eq!(&[procedure_id], &finished_ids[..]);
571 }
572
573 #[tokio::test]
574 async fn test_rollback_procedure() {
575 let dir = create_temp_dir("rollback_procedure");
576 let store = procedure_store_for_test(&dir);
577
578 let procedure_id = ProcedureId::random();
579 let procedure: BoxedProcedure = Box::new(MockProcedure::new("test store procedure"));
580 let type_name = procedure.type_name().to_string();
581 let data = procedure.dump().unwrap();
582 store
583 .store_procedure(procedure_id, 0, type_name.clone(), data.clone(), None)
584 .await
585 .unwrap();
586 let message = ProcedureMessage {
587 type_name,
588 data,
589 parent_id: None,
590 step: 1,
591 error: None,
592 };
593 store
594 .rollback_procedure(procedure_id, message)
595 .await
596 .unwrap();
597
598 let ProcedureMessages {
599 messages,
600 rollback_messages,
601 finished_ids,
602 } = store.load_messages().await.unwrap();
603 assert!(messages.is_empty());
604 assert_eq!(1, rollback_messages.len());
605 assert!(finished_ids.is_empty());
606 assert!(rollback_messages.contains_key(&procedure_id));
607 }
608
609 #[tokio::test]
610 async fn test_delete_procedure() {
611 let dir = create_temp_dir("delete_procedure");
612 let store = procedure_store_for_test(&dir);
613
614 let procedure_id = ProcedureId::random();
615 let procedure: BoxedProcedure = Box::new(MockProcedure::new("test store procedure"));
616 let type_name = procedure.type_name().to_string();
617 let data = procedure.dump().unwrap();
618 store
619 .store_procedure(procedure_id, 0, type_name, data, None)
620 .await
621 .unwrap();
622 let type_name = procedure.type_name().to_string();
623 let data = procedure.dump().unwrap();
624 store
625 .store_procedure(procedure_id, 1, type_name, data, None)
626 .await
627 .unwrap();
628
629 store.delete_procedure(procedure_id).await.unwrap();
630
631 let ProcedureMessages {
632 messages,
633 rollback_messages,
634 finished_ids,
635 } = store.load_messages().await.unwrap();
636 assert!(messages.is_empty());
637 assert!(rollback_messages.is_empty());
638 assert!(finished_ids.is_empty());
639 }
640
641 #[tokio::test]
642 async fn test_delete_committed_procedure() {
643 let dir = create_temp_dir("delete_committed");
644 let store = procedure_store_for_test(&dir);
645
646 let procedure_id = ProcedureId::random();
647 let procedure: BoxedProcedure = Box::new(MockProcedure::new("test store procedure"));
648
649 let type_name = procedure.type_name().to_string();
650 let data = procedure.dump().unwrap();
651 store
652 .store_procedure(procedure_id, 0, type_name, data, None)
653 .await
654 .unwrap();
655
656 let type_name = procedure.type_name().to_string();
657 let data = procedure.dump().unwrap();
658 store
659 .store_procedure(procedure_id, 1, type_name, data, None)
660 .await
661 .unwrap();
662 store.commit_procedure(procedure_id, 2).await.unwrap();
663
664 store.delete_procedure(procedure_id).await.unwrap();
665
666 let ProcedureMessages {
667 messages,
668 rollback_messages,
669 finished_ids,
670 } = store.load_messages().await.unwrap();
671 assert!(messages.is_empty());
672 assert!(rollback_messages.is_empty());
673 assert!(finished_ids.is_empty());
674 }
675
676 #[tokio::test]
677 async fn test_load_messages() {
678 let dir = create_temp_dir("load_messages");
679 let store = procedure_store_for_test(&dir);
680
681 let id0 = ProcedureId::random();
683 let procedure: BoxedProcedure = Box::new(MockProcedure::new("id0-0"));
684 let type_name = procedure.type_name().to_string();
685 let data = procedure.dump().unwrap();
686 store
687 .store_procedure(id0, 0, type_name, data, None)
688 .await
689 .unwrap();
690 let procedure: BoxedProcedure = Box::new(MockProcedure::new("id0-1"));
691 let type_name = procedure.type_name().to_string();
692 let data = procedure.dump().unwrap();
693 store
694 .store_procedure(id0, 1, type_name, data, None)
695 .await
696 .unwrap();
697 let procedure: BoxedProcedure = Box::new(MockProcedure::new("id0-2"));
698 let type_name = procedure.type_name().to_string();
699 let data = procedure.dump().unwrap();
700 store
701 .store_procedure(id0, 2, type_name, data, None)
702 .await
703 .unwrap();
704
705 let id1 = ProcedureId::random();
707 let procedure: BoxedProcedure = Box::new(MockProcedure::new("id1-0"));
708 let type_name = procedure.type_name().to_string();
709 let data = procedure.dump().unwrap();
710 store
711 .store_procedure(id1, 0, type_name, data, None)
712 .await
713 .unwrap();
714 let procedure: BoxedProcedure = Box::new(MockProcedure::new("id1-1"));
715 let type_name = procedure.type_name().to_string();
716 let data = procedure.dump().unwrap();
717 store
718 .store_procedure(id1, 1, type_name, data, None)
719 .await
720 .unwrap();
721 store.commit_procedure(id1, 2).await.unwrap();
722
723 let id2 = ProcedureId::random();
725 let procedure: BoxedProcedure = Box::new(MockProcedure::new("id2-0"));
726 let type_name = procedure.type_name().to_string();
727 let data = procedure.dump().unwrap();
728 store
729 .store_procedure(id2, 0, type_name, data, None)
730 .await
731 .unwrap();
732
733 let ProcedureMessages {
734 messages,
735 rollback_messages,
736 finished_ids,
737 } = store.load_messages().await.unwrap();
738 assert_eq!(2, messages.len());
739 assert!(rollback_messages.is_empty());
740 assert_eq!(1, finished_ids.len());
741
742 let msg = messages.get(&id0).unwrap();
743 assert_eq!("id0-2", msg.data);
744 let msg = messages.get(&id2).unwrap();
745 assert_eq!("id2-0", msg.data);
746 }
747}