common_procedure/
store.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::fmt;
17
18use common_telemetry::{debug, error, info, warn};
19use futures::TryStreamExt;
20use serde::{Deserialize, Serialize};
21use snafu::ResultExt;
22
23use crate::error::{Result, ToJsonSnafu};
24pub(crate) use crate::store::state_store::StateStoreRef;
25use crate::ProcedureId;
26
27pub mod poison_store;
28pub mod state_store;
29pub mod util;
30
31/// Key prefix of procedure store.
32const PROC_PATH: &str = "procedure/";
33
34/// Constructs a path for procedure store.
35macro_rules! proc_path {
36    ($store: expr, $fmt:expr) => { format!("{}{}", $store.proc_path(), format_args!($fmt)) };
37    ($store: expr, $fmt:expr, $($args:tt)*) => { format!("{}{}", $store.proc_path(), format_args!($fmt, $($args)*)) };
38}
39
40pub(crate) use proc_path;
41
42/// Serialized data of a procedure.
43#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
44pub struct ProcedureMessage {
45    /// Type name of the procedure. The procedure framework also use the type name to
46    /// find a loader to load the procedure.
47    pub type_name: String,
48    /// The data of the procedure.
49    pub data: String,
50    /// Parent procedure id.
51    pub parent_id: Option<ProcedureId>,
52    /// Current step.
53    pub step: u32,
54    /// Errors raised during the procedure.
55    #[serde(default, skip_serializing_if = "Option::is_none")]
56    pub error: Option<String>,
57}
58
59/// A collection of all procedures' messages.
60#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
61pub struct ProcedureMessages {
62    /// A map of uncommitted procedures
63    pub messages: HashMap<ProcedureId, ProcedureMessage>,
64    /// A map of rolling back procedures
65    pub rollback_messages: HashMap<ProcedureId, ProcedureMessage>,
66    /// A list of finished procedures' ids
67    pub finished_ids: Vec<ProcedureId>,
68}
69
70/// Procedure storage layer.
71pub(crate) struct ProcedureStore {
72    proc_path: String,
73    store: StateStoreRef,
74}
75
76impl ProcedureStore {
77    /// Creates a new [ProcedureStore] from specific [StateStoreRef].
78    pub(crate) fn new(parent_path: &str, store: StateStoreRef) -> ProcedureStore {
79        let proc_path = format!("{}{PROC_PATH}", parent_path);
80        info!("The procedure state store path is: {}", &proc_path);
81        ProcedureStore { proc_path, store }
82    }
83
84    #[inline]
85    pub(crate) fn proc_path(&self) -> &str {
86        &self.proc_path
87    }
88
89    /// Dump the `procedure` to the storage.
90    pub(crate) async fn store_procedure(
91        &self,
92        procedure_id: ProcedureId,
93        step: u32,
94        type_name: String,
95        data: String,
96        parent_id: Option<ProcedureId>,
97    ) -> Result<()> {
98        let message = ProcedureMessage {
99            type_name,
100            data,
101            parent_id,
102            step,
103            error: None,
104        };
105        let key = ParsedKey {
106            prefix: &self.proc_path,
107            procedure_id,
108            step,
109            key_type: KeyType::Step,
110        }
111        .to_string();
112        let value = serde_json::to_string(&message).context(ToJsonSnafu)?;
113
114        self.store.put(&key, value.into_bytes()).await?;
115
116        Ok(())
117    }
118
119    /// Write commit flag to the storage.
120    pub(crate) async fn commit_procedure(
121        &self,
122        procedure_id: ProcedureId,
123        step: u32,
124    ) -> Result<()> {
125        let key = ParsedKey {
126            prefix: &self.proc_path,
127            procedure_id,
128            step,
129            key_type: KeyType::Commit,
130        }
131        .to_string();
132        self.store.put(&key, Vec::new()).await?;
133
134        Ok(())
135    }
136
137    /// Write rollback flag to the storage.
138    pub(crate) async fn rollback_procedure(
139        &self,
140        procedure_id: ProcedureId,
141        message: ProcedureMessage,
142    ) -> Result<()> {
143        let key = ParsedKey {
144            prefix: &self.proc_path,
145            procedure_id,
146            step: message.step,
147            key_type: KeyType::Rollback,
148        }
149        .to_string();
150
151        self.store
152            .put(&key, serde_json::to_vec(&message).context(ToJsonSnafu)?)
153            .await?;
154
155        Ok(())
156    }
157
158    /// Delete states of procedure from the storage.
159    pub(crate) async fn delete_procedure(&self, procedure_id: ProcedureId) -> Result<()> {
160        let path = proc_path!(self, "{procedure_id}/");
161        // TODO(yingwen): We can optimize this to avoid reading the value.
162        let mut key_values = self.store.walk_top_down(&path).await?;
163        // 8 should be enough for most procedures.
164        let mut step_keys = Vec::with_capacity(8);
165        let mut finish_keys = Vec::new();
166        while let Some((key_set, _)) = key_values.try_next().await? {
167            let key = key_set.key();
168            let Some(curr_key) = ParsedKey::parse_str(&self.proc_path, key) else {
169                warn!("Unknown key while deleting procedures, key: {}", key);
170                continue;
171            };
172            if curr_key.key_type == KeyType::Step {
173                step_keys.extend(key_set.keys());
174            } else {
175                // .commit or .rollback
176                finish_keys.extend(key_set.keys());
177            }
178        }
179
180        debug!(
181            "Delete keys for procedure {}, step_keys: {:?}, finish_keys: {:?}",
182            procedure_id, step_keys, finish_keys
183        );
184        // We delete all step keys first.
185        self.store.batch_delete(step_keys.as_slice()).await?;
186        // Then we delete the finish keys, to ensure
187        self.store.batch_delete(finish_keys.as_slice()).await?;
188        // Finally we remove the directory itself.
189        self.store.delete(&path).await?;
190        // Maybe we could use procedure_id.commit/rollback as the file name so we could
191        // use remove_all to remove the directory and then remove the commit/rollback file.
192
193        Ok(())
194    }
195
196    /// Load procedures from the storage.
197    pub(crate) async fn load_messages(&self) -> Result<ProcedureMessages> {
198        // Track the key-value pair by procedure id.
199        let mut procedure_key_values: HashMap<_, (ParsedKey, Vec<u8>)> = HashMap::new();
200
201        // Scan all procedures.
202        let mut key_values = self.store.walk_top_down(&self.proc_path).await?;
203        while let Some((key_set, value)) = key_values.try_next().await? {
204            let key = key_set.key();
205            let Some(curr_key) = ParsedKey::parse_str(&self.proc_path, key) else {
206                warn!("Unknown key while loading procedures, key: {}", key);
207                continue;
208            };
209
210            if let Some(entry) = procedure_key_values.get_mut(&curr_key.procedure_id) {
211                if entry.0.step < curr_key.step {
212                    entry.0 = curr_key;
213                    entry.1 = value;
214                }
215            } else {
216                let _ = procedure_key_values.insert(curr_key.procedure_id, (curr_key, value));
217            }
218        }
219
220        let mut messages = HashMap::with_capacity(procedure_key_values.len());
221        let mut rollback_messages = HashMap::new();
222        let mut finished_ids = Vec::new();
223        for (procedure_id, (parsed_key, value)) in procedure_key_values {
224            match parsed_key.key_type {
225                KeyType::Step => {
226                    let Some(message) = self.load_one_message(&parsed_key, &value) else {
227                        // We don't abort the loading process and just ignore errors to ensure all remaining
228                        // procedures are loaded.
229                        continue;
230                    };
231                    let _ = messages.insert(procedure_id, message);
232                }
233                KeyType::Commit => {
234                    finished_ids.push(procedure_id);
235                }
236                KeyType::Rollback => {
237                    let Some(message) = self.load_one_message(&parsed_key, &value) else {
238                        // We don't abort the loading process and just ignore errors to ensure all remaining
239                        // procedures are loaded.
240                        continue;
241                    };
242                    let _ = rollback_messages.insert(procedure_id, message);
243                }
244            }
245        }
246
247        Ok(ProcedureMessages {
248            messages,
249            rollback_messages,
250            finished_ids,
251        })
252    }
253
254    fn load_one_message(&self, key: &ParsedKey, value: &[u8]) -> Option<ProcedureMessage> {
255        serde_json::from_slice(value)
256            .map_err(|e| {
257                // `e` doesn't impl ErrorExt so we print it as normal error.
258                error!("Failed to parse value, key: {:?}, source: {:?}", key, e);
259                e
260            })
261            .ok()
262    }
263}
264
265/// Suffix type of the key.
266#[derive(Debug, PartialEq, Eq)]
267enum KeyType {
268    Step,
269    Commit,
270    Rollback,
271}
272
273impl KeyType {
274    fn as_str(&self) -> &'static str {
275        match self {
276            KeyType::Step => "step",
277            KeyType::Commit => "commit",
278            KeyType::Rollback => "rollback",
279        }
280    }
281
282    fn from_str(s: &str) -> Option<KeyType> {
283        match s {
284            "step" => Some(KeyType::Step),
285            "commit" => Some(KeyType::Commit),
286            "rollback" => Some(KeyType::Rollback),
287            _ => None,
288        }
289    }
290}
291
292/// Key to refer the procedure in the [ProcedureStore].
293#[derive(Debug, PartialEq, Eq)]
294struct ParsedKey<'a> {
295    prefix: &'a str,
296    procedure_id: ProcedureId,
297    step: u32,
298    key_type: KeyType,
299}
300
301impl fmt::Display for ParsedKey<'_> {
302    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
303        write!(
304            f,
305            "{}{}/{:010}.{}",
306            self.prefix,
307            self.procedure_id,
308            self.step,
309            self.key_type.as_str(),
310        )
311    }
312}
313
314impl<'a> ParsedKey<'a> {
315    /// Try to parse the key from specific `input`.
316    fn parse_str(prefix: &'a str, input: &str) -> Option<ParsedKey<'a>> {
317        let input = input.strip_prefix(prefix)?;
318        let mut iter = input.rsplit('/');
319        let name = iter.next()?;
320        let id_str = iter.next()?;
321
322        let procedure_id = ProcedureId::parse_str(id_str).ok()?;
323
324        let mut parts = name.split('.');
325        let step_str = parts.next()?;
326        let suffix = parts.next()?;
327        let key_type = KeyType::from_str(suffix)?;
328        let step = step_str.parse().ok()?;
329
330        Some(ParsedKey {
331            prefix,
332            procedure_id,
333            step,
334            key_type,
335        })
336    }
337}
338
339#[cfg(test)]
340mod tests {
341    use std::sync::Arc;
342
343    use object_store::ObjectStore;
344
345    use crate::procedure::PoisonKeys;
346    use crate::store::state_store::ObjectStateStore;
347    use crate::BoxedProcedure;
348
349    impl ProcedureStore {
350        pub(crate) fn from_object_store(store: ObjectStore) -> ProcedureStore {
351            let state_store = ObjectStateStore::new(store);
352
353            ProcedureStore::new("data/", Arc::new(state_store))
354        }
355    }
356
357    use async_trait::async_trait;
358    use common_test_util::temp_dir::{create_temp_dir, TempDir};
359    use object_store::services::Fs as Builder;
360
361    use super::*;
362    use crate::{Context, LockKey, Procedure, Status};
363
364    fn procedure_store_for_test(dir: &TempDir) -> ProcedureStore {
365        let store_dir = dir.path().to_str().unwrap();
366        let builder = Builder::default().root(store_dir);
367        let object_store = ObjectStore::new(builder).unwrap().finish();
368
369        ProcedureStore::from_object_store(object_store)
370    }
371
372    #[test]
373    fn test_parsed_key() {
374        let dir = create_temp_dir("store_procedure");
375        let store = procedure_store_for_test(&dir);
376
377        let procedure_id = ProcedureId::random();
378        let key = ParsedKey {
379            prefix: &store.proc_path,
380            procedure_id,
381            step: 2,
382            key_type: KeyType::Step,
383        };
384        assert_eq!(
385            proc_path!(store, "{procedure_id}/0000000002.step"),
386            key.to_string()
387        );
388        assert_eq!(
389            key,
390            ParsedKey::parse_str(&store.proc_path, &key.to_string()).unwrap()
391        );
392
393        let key = ParsedKey {
394            prefix: &store.proc_path,
395            procedure_id,
396            step: 2,
397            key_type: KeyType::Commit,
398        };
399        assert_eq!(
400            proc_path!(store, "{procedure_id}/0000000002.commit"),
401            key.to_string()
402        );
403        assert_eq!(
404            key,
405            ParsedKey::parse_str(&store.proc_path, &key.to_string()).unwrap()
406        );
407
408        let key = ParsedKey {
409            prefix: &store.proc_path,
410            procedure_id,
411            step: 2,
412            key_type: KeyType::Rollback,
413        };
414        assert_eq!(
415            proc_path!(store, "{procedure_id}/0000000002.rollback"),
416            key.to_string()
417        );
418        assert_eq!(
419            key,
420            ParsedKey::parse_str(&store.proc_path, &key.to_string()).unwrap()
421        );
422    }
423
424    #[test]
425    fn test_parse_invalid_key() {
426        let dir = create_temp_dir("store_procedure");
427        let store = procedure_store_for_test(&dir);
428
429        assert!(ParsedKey::parse_str(&store.proc_path, "").is_none());
430        assert!(ParsedKey::parse_str(&store.proc_path, "invalidprefix").is_none());
431        assert!(ParsedKey::parse_str(&store.proc_path, "procedu/0000000003.step").is_none());
432        assert!(ParsedKey::parse_str(&store.proc_path, "procedure-0000000003.step").is_none());
433
434        let procedure_id = ProcedureId::random();
435        let input = proc_path!(store, "{procedure_id}");
436        assert!(ParsedKey::parse_str(&store.proc_path, &input).is_none());
437
438        let input = proc_path!(store, "{procedure_id}");
439        assert!(ParsedKey::parse_str(&store.proc_path, &input).is_none());
440
441        let input = proc_path!(store, "{procedure_id}/0000000003");
442        assert!(ParsedKey::parse_str(&store.proc_path, &input).is_none());
443
444        let input = proc_path!(store, "{procedure_id}/0000000003.");
445        assert!(ParsedKey::parse_str(&store.proc_path, &input).is_none());
446
447        let input = proc_path!(store, "{procedure_id}/0000000003.other");
448        assert!(ParsedKey::parse_str(&store.proc_path, &input).is_none());
449
450        assert!(ParsedKey::parse_str(&store.proc_path, "12345/0000000003.step").is_none());
451
452        let input = proc_path!(store, "{procedure_id}-0000000003.commit");
453        assert!(ParsedKey::parse_str(&store.proc_path, &input).is_none());
454    }
455
456    #[test]
457    fn test_procedure_message() {
458        let mut message = ProcedureMessage {
459            type_name: "TestMessage".to_string(),
460            data: "no parent id".to_string(),
461            parent_id: None,
462            step: 4,
463            error: None,
464        };
465
466        let json = serde_json::to_string(&message).unwrap();
467        assert_eq!(
468            json,
469            r#"{"type_name":"TestMessage","data":"no parent id","parent_id":null,"step":4}"#
470        );
471
472        let procedure_id = ProcedureId::parse_str("9f805a1f-05f7-490c-9f91-bd56e3cc54c1").unwrap();
473        message.parent_id = Some(procedure_id);
474        let json = serde_json::to_string(&message).unwrap();
475        assert_eq!(
476            json,
477            r#"{"type_name":"TestMessage","data":"no parent id","parent_id":"9f805a1f-05f7-490c-9f91-bd56e3cc54c1","step":4}"#
478        );
479    }
480
481    struct MockProcedure {
482        data: String,
483    }
484
485    impl MockProcedure {
486        fn new(data: impl Into<String>) -> MockProcedure {
487            MockProcedure { data: data.into() }
488        }
489    }
490
491    #[async_trait]
492    impl Procedure for MockProcedure {
493        fn type_name(&self) -> &str {
494            "MockProcedure"
495        }
496
497        async fn execute(&mut self, _ctx: &Context) -> Result<Status> {
498            unimplemented!()
499        }
500
501        fn dump(&self) -> Result<String> {
502            Ok(self.data.clone())
503        }
504
505        fn lock_key(&self) -> LockKey {
506            LockKey::default()
507        }
508
509        fn poison_keys(&self) -> PoisonKeys {
510            PoisonKeys::default()
511        }
512    }
513
514    #[tokio::test]
515    async fn test_store_procedure() {
516        let dir = create_temp_dir("store_procedure");
517        let store = procedure_store_for_test(&dir);
518
519        let procedure_id = ProcedureId::random();
520        let procedure: BoxedProcedure = Box::new(MockProcedure::new("test store procedure"));
521        let type_name = procedure.type_name().to_string();
522        let data = procedure.dump().unwrap();
523        store
524            .store_procedure(procedure_id, 0, type_name, data, None)
525            .await
526            .unwrap();
527
528        let ProcedureMessages {
529            messages,
530            rollback_messages,
531            finished_ids,
532        } = store.load_messages().await.unwrap();
533        assert_eq!(1, messages.len());
534        assert!(rollback_messages.is_empty());
535        assert!(finished_ids.is_empty());
536        let msg = messages.get(&procedure_id).unwrap();
537        let expect = ProcedureMessage {
538            type_name: "MockProcedure".to_string(),
539            data: "test store procedure".to_string(),
540            parent_id: None,
541            step: 0,
542            error: None,
543        };
544        assert_eq!(expect, *msg);
545    }
546
547    #[tokio::test]
548    async fn test_commit_procedure() {
549        let dir = create_temp_dir("commit_procedure");
550        let store = procedure_store_for_test(&dir);
551
552        let procedure_id = ProcedureId::random();
553        let procedure: BoxedProcedure = Box::new(MockProcedure::new("test store procedure"));
554        let type_name = procedure.type_name().to_string();
555        let data = procedure.dump().unwrap();
556        store
557            .store_procedure(procedure_id, 0, type_name, data, None)
558            .await
559            .unwrap();
560        store.commit_procedure(procedure_id, 1).await.unwrap();
561
562        let ProcedureMessages {
563            messages,
564            rollback_messages,
565            finished_ids,
566        } = store.load_messages().await.unwrap();
567        assert!(messages.is_empty());
568        assert!(rollback_messages.is_empty());
569        assert_eq!(&[procedure_id], &finished_ids[..]);
570    }
571
572    #[tokio::test]
573    async fn test_rollback_procedure() {
574        let dir = create_temp_dir("rollback_procedure");
575        let store = procedure_store_for_test(&dir);
576
577        let procedure_id = ProcedureId::random();
578        let procedure: BoxedProcedure = Box::new(MockProcedure::new("test store procedure"));
579        let type_name = procedure.type_name().to_string();
580        let data = procedure.dump().unwrap();
581        store
582            .store_procedure(
583                procedure_id,
584                0,
585                type_name.to_string(),
586                data.to_string(),
587                None,
588            )
589            .await
590            .unwrap();
591        let message = ProcedureMessage {
592            type_name,
593            data,
594            parent_id: None,
595            step: 1,
596            error: None,
597        };
598        store
599            .rollback_procedure(procedure_id, message)
600            .await
601            .unwrap();
602
603        let ProcedureMessages {
604            messages,
605            rollback_messages,
606            finished_ids,
607        } = store.load_messages().await.unwrap();
608        assert!(messages.is_empty());
609        assert_eq!(1, rollback_messages.len());
610        assert!(finished_ids.is_empty());
611        assert!(rollback_messages.contains_key(&procedure_id));
612    }
613
614    #[tokio::test]
615    async fn test_delete_procedure() {
616        let dir = create_temp_dir("delete_procedure");
617        let store = procedure_store_for_test(&dir);
618
619        let procedure_id = ProcedureId::random();
620        let procedure: BoxedProcedure = Box::new(MockProcedure::new("test store procedure"));
621        let type_name = procedure.type_name().to_string();
622        let data = procedure.dump().unwrap();
623        store
624            .store_procedure(procedure_id, 0, type_name, data, None)
625            .await
626            .unwrap();
627        let type_name = procedure.type_name().to_string();
628        let data = procedure.dump().unwrap();
629        store
630            .store_procedure(procedure_id, 1, type_name, data, None)
631            .await
632            .unwrap();
633
634        store.delete_procedure(procedure_id).await.unwrap();
635
636        let ProcedureMessages {
637            messages,
638            rollback_messages,
639            finished_ids,
640        } = store.load_messages().await.unwrap();
641        assert!(messages.is_empty());
642        assert!(rollback_messages.is_empty());
643        assert!(finished_ids.is_empty());
644    }
645
646    #[tokio::test]
647    async fn test_delete_committed_procedure() {
648        let dir = create_temp_dir("delete_committed");
649        let store = procedure_store_for_test(&dir);
650
651        let procedure_id = ProcedureId::random();
652        let procedure: BoxedProcedure = Box::new(MockProcedure::new("test store procedure"));
653
654        let type_name = procedure.type_name().to_string();
655        let data = procedure.dump().unwrap();
656        store
657            .store_procedure(procedure_id, 0, type_name, data, None)
658            .await
659            .unwrap();
660
661        let type_name = procedure.type_name().to_string();
662        let data = procedure.dump().unwrap();
663        store
664            .store_procedure(procedure_id, 1, type_name, data, None)
665            .await
666            .unwrap();
667        store.commit_procedure(procedure_id, 2).await.unwrap();
668
669        store.delete_procedure(procedure_id).await.unwrap();
670
671        let ProcedureMessages {
672            messages,
673            rollback_messages,
674            finished_ids,
675        } = store.load_messages().await.unwrap();
676        assert!(messages.is_empty());
677        assert!(rollback_messages.is_empty());
678        assert!(finished_ids.is_empty());
679    }
680
681    #[tokio::test]
682    async fn test_load_messages() {
683        let dir = create_temp_dir("load_messages");
684        let store = procedure_store_for_test(&dir);
685
686        // store 3 steps
687        let id0 = ProcedureId::random();
688        let procedure: BoxedProcedure = Box::new(MockProcedure::new("id0-0"));
689        let type_name = procedure.type_name().to_string();
690        let data = procedure.dump().unwrap();
691        store
692            .store_procedure(id0, 0, type_name, data, None)
693            .await
694            .unwrap();
695        let procedure: BoxedProcedure = Box::new(MockProcedure::new("id0-1"));
696        let type_name = procedure.type_name().to_string();
697        let data = procedure.dump().unwrap();
698        store
699            .store_procedure(id0, 1, type_name, data, None)
700            .await
701            .unwrap();
702        let procedure: BoxedProcedure = Box::new(MockProcedure::new("id0-2"));
703        let type_name = procedure.type_name().to_string();
704        let data = procedure.dump().unwrap();
705        store
706            .store_procedure(id0, 2, type_name, data, None)
707            .await
708            .unwrap();
709
710        // store 2 steps and then commit
711        let id1 = ProcedureId::random();
712        let procedure: BoxedProcedure = Box::new(MockProcedure::new("id1-0"));
713        let type_name = procedure.type_name().to_string();
714        let data = procedure.dump().unwrap();
715        store
716            .store_procedure(id1, 0, type_name, data, None)
717            .await
718            .unwrap();
719        let procedure: BoxedProcedure = Box::new(MockProcedure::new("id1-1"));
720        let type_name = procedure.type_name().to_string();
721        let data = procedure.dump().unwrap();
722        store
723            .store_procedure(id1, 1, type_name, data, None)
724            .await
725            .unwrap();
726        store.commit_procedure(id1, 2).await.unwrap();
727
728        // store 1 step
729        let id2 = ProcedureId::random();
730        let procedure: BoxedProcedure = Box::new(MockProcedure::new("id2-0"));
731        let type_name = procedure.type_name().to_string();
732        let data = procedure.dump().unwrap();
733        store
734            .store_procedure(id2, 0, type_name, data, None)
735            .await
736            .unwrap();
737
738        let ProcedureMessages {
739            messages,
740            rollback_messages,
741            finished_ids,
742        } = store.load_messages().await.unwrap();
743        assert_eq!(2, messages.len());
744        assert!(rollback_messages.is_empty());
745        assert_eq!(1, finished_ids.len());
746
747        let msg = messages.get(&id0).unwrap();
748        assert_eq!("id0-2", msg.data);
749        let msg = messages.get(&id2).unwrap();
750        assert_eq!("id2-0", msg.data);
751    }
752}