common_procedure/
store.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::fmt;
17
18use common_telemetry::{debug, error, info, warn};
19use futures::TryStreamExt;
20use serde::{Deserialize, Serialize};
21use snafu::ResultExt;
22
23use crate::ProcedureId;
24use crate::error::{Result, ToJsonSnafu};
25pub(crate) use crate::store::state_store::StateStoreRef;
26
27pub mod poison_store;
28pub mod state_store;
29pub mod util;
30
31/// Key prefix of procedure store.
32const PROC_PATH: &str = "procedure/";
33
34/// Constructs a path for procedure store.
35macro_rules! proc_path {
36    ($store: expr, $fmt:expr) => { format!("{}{}", $store.proc_path(), format_args!($fmt)) };
37    ($store: expr, $fmt:expr, $($args:tt)*) => { format!("{}{}", $store.proc_path(), format_args!($fmt, $($args)*)) };
38}
39
40#[cfg(test)]
41pub(crate) use proc_path;
42
43/// Serialized data of a procedure.
44#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
45pub struct ProcedureMessage {
46    /// Type name of the procedure. The procedure framework also use the type name to
47    /// find a loader to load the procedure.
48    pub type_name: String,
49    /// The data of the procedure.
50    pub data: String,
51    /// Parent procedure id.
52    pub parent_id: Option<ProcedureId>,
53    /// Current step.
54    pub step: u32,
55    /// Errors raised during the procedure.
56    #[serde(default, skip_serializing_if = "Option::is_none")]
57    pub error: Option<String>,
58}
59
60/// A collection of all procedures' messages.
61#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
62pub struct ProcedureMessages {
63    /// A map of uncommitted procedures
64    pub messages: HashMap<ProcedureId, ProcedureMessage>,
65    /// A map of rolling back procedures
66    pub rollback_messages: HashMap<ProcedureId, ProcedureMessage>,
67    /// A list of finished procedures' ids
68    pub finished_ids: Vec<ProcedureId>,
69}
70
71/// Procedure storage layer.
72pub(crate) struct ProcedureStore {
73    proc_path: String,
74    store: StateStoreRef,
75}
76
77impl ProcedureStore {
78    /// Creates a new [ProcedureStore] from specific [StateStoreRef].
79    pub(crate) fn new(parent_path: &str, store: StateStoreRef) -> ProcedureStore {
80        let proc_path = format!("{}{PROC_PATH}", parent_path);
81        info!("The procedure state store path is: {}", &proc_path);
82        ProcedureStore { proc_path, store }
83    }
84
85    #[inline]
86    pub(crate) fn proc_path(&self) -> &str {
87        &self.proc_path
88    }
89
90    /// Dump the `procedure` to the storage.
91    pub(crate) async fn store_procedure(
92        &self,
93        procedure_id: ProcedureId,
94        step: u32,
95        type_name: String,
96        data: String,
97        parent_id: Option<ProcedureId>,
98    ) -> Result<()> {
99        let message = ProcedureMessage {
100            type_name,
101            data,
102            parent_id,
103            step,
104            error: None,
105        };
106        let key = ParsedKey {
107            prefix: &self.proc_path,
108            procedure_id,
109            step,
110            key_type: KeyType::Step,
111        }
112        .to_string();
113        let value = serde_json::to_string(&message).context(ToJsonSnafu)?;
114
115        self.store.put(&key, value.into_bytes()).await?;
116
117        Ok(())
118    }
119
120    /// Write commit flag to the storage.
121    pub(crate) async fn commit_procedure(
122        &self,
123        procedure_id: ProcedureId,
124        step: u32,
125    ) -> Result<()> {
126        let key = ParsedKey {
127            prefix: &self.proc_path,
128            procedure_id,
129            step,
130            key_type: KeyType::Commit,
131        }
132        .to_string();
133        self.store.put(&key, Vec::new()).await?;
134
135        Ok(())
136    }
137
138    /// Write rollback flag to the storage.
139    pub(crate) async fn rollback_procedure(
140        &self,
141        procedure_id: ProcedureId,
142        message: ProcedureMessage,
143    ) -> Result<()> {
144        let key = ParsedKey {
145            prefix: &self.proc_path,
146            procedure_id,
147            step: message.step,
148            key_type: KeyType::Rollback,
149        }
150        .to_string();
151
152        self.store
153            .put(&key, serde_json::to_vec(&message).context(ToJsonSnafu)?)
154            .await?;
155
156        Ok(())
157    }
158
159    /// Delete states of procedure from the storage.
160    pub(crate) async fn delete_procedure(&self, procedure_id: ProcedureId) -> Result<()> {
161        let path = proc_path!(self, "{procedure_id}/");
162        // TODO(yingwen): We can optimize this to avoid reading the value.
163        let mut key_values = self.store.walk_top_down(&path).await?;
164        // 8 should be enough for most procedures.
165        let mut step_keys = Vec::with_capacity(8);
166        let mut finish_keys = Vec::new();
167        while let Some((key_set, _)) = key_values.try_next().await? {
168            let key = key_set.key();
169            let Some(curr_key) = ParsedKey::parse_str(&self.proc_path, key) else {
170                warn!("Unknown key while deleting procedures, key: {}", key);
171                continue;
172            };
173            if curr_key.key_type == KeyType::Step {
174                step_keys.extend(key_set.keys());
175            } else {
176                // .commit or .rollback
177                finish_keys.extend(key_set.keys());
178            }
179        }
180
181        debug!(
182            "Delete keys for procedure {}, step_keys: {:?}, finish_keys: {:?}",
183            procedure_id, step_keys, finish_keys
184        );
185        // We delete all step keys first.
186        self.store.batch_delete(step_keys.as_slice()).await?;
187        // Then we delete the finish keys, to ensure
188        self.store.batch_delete(finish_keys.as_slice()).await?;
189        // Finally we remove the directory itself.
190        self.store.delete(&path).await?;
191        // Maybe we could use procedure_id.commit/rollback as the file name so we could
192        // use remove_all to remove the directory and then remove the commit/rollback file.
193
194        Ok(())
195    }
196
197    /// Load procedures from the storage.
198    pub(crate) async fn load_messages(&self) -> Result<ProcedureMessages> {
199        // Track the key-value pair by procedure id.
200        let mut procedure_key_values: HashMap<_, (ParsedKey, Vec<u8>)> = HashMap::new();
201
202        // Scan all procedures.
203        let mut key_values = self.store.walk_top_down(&self.proc_path).await?;
204        while let Some((key_set, value)) = key_values.try_next().await? {
205            let key = key_set.key();
206            let Some(curr_key) = ParsedKey::parse_str(&self.proc_path, key) else {
207                warn!("Unknown key while loading procedures, key: {}", key);
208                continue;
209            };
210
211            if let Some(entry) = procedure_key_values.get_mut(&curr_key.procedure_id) {
212                if entry.0.step < curr_key.step {
213                    entry.0 = curr_key;
214                    entry.1 = value;
215                }
216            } else {
217                let _ = procedure_key_values.insert(curr_key.procedure_id, (curr_key, value));
218            }
219        }
220
221        let mut messages = HashMap::with_capacity(procedure_key_values.len());
222        let mut rollback_messages = HashMap::new();
223        let mut finished_ids = Vec::new();
224        for (procedure_id, (parsed_key, value)) in procedure_key_values {
225            match parsed_key.key_type {
226                KeyType::Step => {
227                    let Some(message) = self.load_one_message(&parsed_key, &value) else {
228                        // We don't abort the loading process and just ignore errors to ensure all remaining
229                        // procedures are loaded.
230                        continue;
231                    };
232                    let _ = messages.insert(procedure_id, message);
233                }
234                KeyType::Commit => {
235                    finished_ids.push(procedure_id);
236                }
237                KeyType::Rollback => {
238                    let Some(message) = self.load_one_message(&parsed_key, &value) else {
239                        // We don't abort the loading process and just ignore errors to ensure all remaining
240                        // procedures are loaded.
241                        continue;
242                    };
243                    let _ = rollback_messages.insert(procedure_id, message);
244                }
245            }
246        }
247
248        Ok(ProcedureMessages {
249            messages,
250            rollback_messages,
251            finished_ids,
252        })
253    }
254
255    fn load_one_message(&self, key: &ParsedKey, value: &[u8]) -> Option<ProcedureMessage> {
256        serde_json::from_slice(value)
257            .map_err(|e| {
258                // `e` doesn't impl ErrorExt so we print it as normal error.
259                error!("Failed to parse value, key: {:?}, source: {:?}", key, e);
260                e
261            })
262            .ok()
263    }
264}
265
266/// Suffix type of the key.
267#[derive(Debug, PartialEq, Eq)]
268enum KeyType {
269    Step,
270    Commit,
271    Rollback,
272}
273
274impl KeyType {
275    fn as_str(&self) -> &'static str {
276        match self {
277            KeyType::Step => "step",
278            KeyType::Commit => "commit",
279            KeyType::Rollback => "rollback",
280        }
281    }
282
283    fn from_str(s: &str) -> Option<KeyType> {
284        match s {
285            "step" => Some(KeyType::Step),
286            "commit" => Some(KeyType::Commit),
287            "rollback" => Some(KeyType::Rollback),
288            _ => None,
289        }
290    }
291}
292
293/// Key to refer the procedure in the [ProcedureStore].
294#[derive(Debug, PartialEq, Eq)]
295struct ParsedKey<'a> {
296    prefix: &'a str,
297    procedure_id: ProcedureId,
298    step: u32,
299    key_type: KeyType,
300}
301
302impl fmt::Display for ParsedKey<'_> {
303    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
304        write!(
305            f,
306            "{}{}/{:010}.{}",
307            self.prefix,
308            self.procedure_id,
309            self.step,
310            self.key_type.as_str(),
311        )
312    }
313}
314
315impl<'a> ParsedKey<'a> {
316    /// Try to parse the key from specific `input`.
317    fn parse_str(prefix: &'a str, input: &str) -> Option<ParsedKey<'a>> {
318        let input = input.strip_prefix(prefix)?;
319        let mut iter = input.rsplit('/');
320        let name = iter.next()?;
321        let id_str = iter.next()?;
322
323        let procedure_id = ProcedureId::parse_str(id_str).ok()?;
324
325        let mut parts = name.split('.');
326        let step_str = parts.next()?;
327        let suffix = parts.next()?;
328        let key_type = KeyType::from_str(suffix)?;
329        let step = step_str.parse().ok()?;
330
331        Some(ParsedKey {
332            prefix,
333            procedure_id,
334            step,
335            key_type,
336        })
337    }
338}
339
340#[cfg(test)]
341mod tests {
342    use std::sync::Arc;
343
344    use object_store::ObjectStore;
345
346    use crate::BoxedProcedure;
347    use crate::procedure::PoisonKeys;
348    use crate::store::state_store::ObjectStateStore;
349
350    impl ProcedureStore {
351        pub(crate) fn from_object_store(store: ObjectStore) -> ProcedureStore {
352            let state_store = ObjectStateStore::new(store);
353
354            ProcedureStore::new("data/", Arc::new(state_store))
355        }
356    }
357
358    use async_trait::async_trait;
359    use common_test_util::temp_dir::{TempDir, create_temp_dir};
360    use object_store::services::Fs as Builder;
361
362    use super::*;
363    use crate::{Context, LockKey, Procedure, Status};
364
365    fn procedure_store_for_test(dir: &TempDir) -> ProcedureStore {
366        let store_dir = dir.path().to_str().unwrap();
367        let builder = Builder::default().root(store_dir);
368        let object_store = ObjectStore::new(builder).unwrap().finish();
369
370        ProcedureStore::from_object_store(object_store)
371    }
372
373    #[test]
374    fn test_parsed_key() {
375        let dir = create_temp_dir("store_procedure");
376        let store = procedure_store_for_test(&dir);
377
378        let procedure_id = ProcedureId::random();
379        let key = ParsedKey {
380            prefix: &store.proc_path,
381            procedure_id,
382            step: 2,
383            key_type: KeyType::Step,
384        };
385        assert_eq!(
386            proc_path!(store, "{procedure_id}/0000000002.step"),
387            key.to_string()
388        );
389        assert_eq!(
390            key,
391            ParsedKey::parse_str(&store.proc_path, &key.to_string()).unwrap()
392        );
393
394        let key = ParsedKey {
395            prefix: &store.proc_path,
396            procedure_id,
397            step: 2,
398            key_type: KeyType::Commit,
399        };
400        assert_eq!(
401            proc_path!(store, "{procedure_id}/0000000002.commit"),
402            key.to_string()
403        );
404        assert_eq!(
405            key,
406            ParsedKey::parse_str(&store.proc_path, &key.to_string()).unwrap()
407        );
408
409        let key = ParsedKey {
410            prefix: &store.proc_path,
411            procedure_id,
412            step: 2,
413            key_type: KeyType::Rollback,
414        };
415        assert_eq!(
416            proc_path!(store, "{procedure_id}/0000000002.rollback"),
417            key.to_string()
418        );
419        assert_eq!(
420            key,
421            ParsedKey::parse_str(&store.proc_path, &key.to_string()).unwrap()
422        );
423    }
424
425    #[test]
426    fn test_parse_invalid_key() {
427        let dir = create_temp_dir("store_procedure");
428        let store = procedure_store_for_test(&dir);
429
430        assert!(ParsedKey::parse_str(&store.proc_path, "").is_none());
431        assert!(ParsedKey::parse_str(&store.proc_path, "invalidprefix").is_none());
432        assert!(ParsedKey::parse_str(&store.proc_path, "procedu/0000000003.step").is_none());
433        assert!(ParsedKey::parse_str(&store.proc_path, "procedure-0000000003.step").is_none());
434
435        let procedure_id = ProcedureId::random();
436        let input = proc_path!(store, "{procedure_id}");
437        assert!(ParsedKey::parse_str(&store.proc_path, &input).is_none());
438
439        let input = proc_path!(store, "{procedure_id}");
440        assert!(ParsedKey::parse_str(&store.proc_path, &input).is_none());
441
442        let input = proc_path!(store, "{procedure_id}/0000000003");
443        assert!(ParsedKey::parse_str(&store.proc_path, &input).is_none());
444
445        let input = proc_path!(store, "{procedure_id}/0000000003.");
446        assert!(ParsedKey::parse_str(&store.proc_path, &input).is_none());
447
448        let input = proc_path!(store, "{procedure_id}/0000000003.other");
449        assert!(ParsedKey::parse_str(&store.proc_path, &input).is_none());
450
451        assert!(ParsedKey::parse_str(&store.proc_path, "12345/0000000003.step").is_none());
452
453        let input = proc_path!(store, "{procedure_id}-0000000003.commit");
454        assert!(ParsedKey::parse_str(&store.proc_path, &input).is_none());
455    }
456
457    #[test]
458    fn test_procedure_message() {
459        let mut message = ProcedureMessage {
460            type_name: "TestMessage".to_string(),
461            data: "no parent id".to_string(),
462            parent_id: None,
463            step: 4,
464            error: None,
465        };
466
467        let json = serde_json::to_string(&message).unwrap();
468        assert_eq!(
469            json,
470            r#"{"type_name":"TestMessage","data":"no parent id","parent_id":null,"step":4}"#
471        );
472
473        let procedure_id = ProcedureId::parse_str("9f805a1f-05f7-490c-9f91-bd56e3cc54c1").unwrap();
474        message.parent_id = Some(procedure_id);
475        let json = serde_json::to_string(&message).unwrap();
476        assert_eq!(
477            json,
478            r#"{"type_name":"TestMessage","data":"no parent id","parent_id":"9f805a1f-05f7-490c-9f91-bd56e3cc54c1","step":4}"#
479        );
480    }
481
482    struct MockProcedure {
483        data: String,
484    }
485
486    impl MockProcedure {
487        fn new(data: impl Into<String>) -> MockProcedure {
488            MockProcedure { data: data.into() }
489        }
490    }
491
492    #[async_trait]
493    impl Procedure for MockProcedure {
494        fn type_name(&self) -> &str {
495            "MockProcedure"
496        }
497
498        async fn execute(&mut self, _ctx: &Context) -> Result<Status> {
499            unimplemented!()
500        }
501
502        fn dump(&self) -> Result<String> {
503            Ok(self.data.clone())
504        }
505
506        fn lock_key(&self) -> LockKey {
507            LockKey::default()
508        }
509
510        fn poison_keys(&self) -> PoisonKeys {
511            PoisonKeys::default()
512        }
513    }
514
515    #[tokio::test]
516    async fn test_store_procedure() {
517        let dir = create_temp_dir("store_procedure");
518        let store = procedure_store_for_test(&dir);
519
520        let procedure_id = ProcedureId::random();
521        let procedure: BoxedProcedure = Box::new(MockProcedure::new("test store procedure"));
522        let type_name = procedure.type_name().to_string();
523        let data = procedure.dump().unwrap();
524        store
525            .store_procedure(procedure_id, 0, type_name, data, None)
526            .await
527            .unwrap();
528
529        let ProcedureMessages {
530            messages,
531            rollback_messages,
532            finished_ids,
533        } = store.load_messages().await.unwrap();
534        assert_eq!(1, messages.len());
535        assert!(rollback_messages.is_empty());
536        assert!(finished_ids.is_empty());
537        let msg = messages.get(&procedure_id).unwrap();
538        let expect = ProcedureMessage {
539            type_name: "MockProcedure".to_string(),
540            data: "test store procedure".to_string(),
541            parent_id: None,
542            step: 0,
543            error: None,
544        };
545        assert_eq!(expect, *msg);
546    }
547
548    #[tokio::test]
549    async fn test_commit_procedure() {
550        let dir = create_temp_dir("commit_procedure");
551        let store = procedure_store_for_test(&dir);
552
553        let procedure_id = ProcedureId::random();
554        let procedure: BoxedProcedure = Box::new(MockProcedure::new("test store procedure"));
555        let type_name = procedure.type_name().to_string();
556        let data = procedure.dump().unwrap();
557        store
558            .store_procedure(procedure_id, 0, type_name, data, None)
559            .await
560            .unwrap();
561        store.commit_procedure(procedure_id, 1).await.unwrap();
562
563        let ProcedureMessages {
564            messages,
565            rollback_messages,
566            finished_ids,
567        } = store.load_messages().await.unwrap();
568        assert!(messages.is_empty());
569        assert!(rollback_messages.is_empty());
570        assert_eq!(&[procedure_id], &finished_ids[..]);
571    }
572
573    #[tokio::test]
574    async fn test_rollback_procedure() {
575        let dir = create_temp_dir("rollback_procedure");
576        let store = procedure_store_for_test(&dir);
577
578        let procedure_id = ProcedureId::random();
579        let procedure: BoxedProcedure = Box::new(MockProcedure::new("test store procedure"));
580        let type_name = procedure.type_name().to_string();
581        let data = procedure.dump().unwrap();
582        store
583            .store_procedure(procedure_id, 0, type_name.clone(), data.clone(), None)
584            .await
585            .unwrap();
586        let message = ProcedureMessage {
587            type_name,
588            data,
589            parent_id: None,
590            step: 1,
591            error: None,
592        };
593        store
594            .rollback_procedure(procedure_id, message)
595            .await
596            .unwrap();
597
598        let ProcedureMessages {
599            messages,
600            rollback_messages,
601            finished_ids,
602        } = store.load_messages().await.unwrap();
603        assert!(messages.is_empty());
604        assert_eq!(1, rollback_messages.len());
605        assert!(finished_ids.is_empty());
606        assert!(rollback_messages.contains_key(&procedure_id));
607    }
608
609    #[tokio::test]
610    async fn test_delete_procedure() {
611        let dir = create_temp_dir("delete_procedure");
612        let store = procedure_store_for_test(&dir);
613
614        let procedure_id = ProcedureId::random();
615        let procedure: BoxedProcedure = Box::new(MockProcedure::new("test store procedure"));
616        let type_name = procedure.type_name().to_string();
617        let data = procedure.dump().unwrap();
618        store
619            .store_procedure(procedure_id, 0, type_name, data, None)
620            .await
621            .unwrap();
622        let type_name = procedure.type_name().to_string();
623        let data = procedure.dump().unwrap();
624        store
625            .store_procedure(procedure_id, 1, type_name, data, None)
626            .await
627            .unwrap();
628
629        store.delete_procedure(procedure_id).await.unwrap();
630
631        let ProcedureMessages {
632            messages,
633            rollback_messages,
634            finished_ids,
635        } = store.load_messages().await.unwrap();
636        assert!(messages.is_empty());
637        assert!(rollback_messages.is_empty());
638        assert!(finished_ids.is_empty());
639    }
640
641    #[tokio::test]
642    async fn test_delete_committed_procedure() {
643        let dir = create_temp_dir("delete_committed");
644        let store = procedure_store_for_test(&dir);
645
646        let procedure_id = ProcedureId::random();
647        let procedure: BoxedProcedure = Box::new(MockProcedure::new("test store procedure"));
648
649        let type_name = procedure.type_name().to_string();
650        let data = procedure.dump().unwrap();
651        store
652            .store_procedure(procedure_id, 0, type_name, data, None)
653            .await
654            .unwrap();
655
656        let type_name = procedure.type_name().to_string();
657        let data = procedure.dump().unwrap();
658        store
659            .store_procedure(procedure_id, 1, type_name, data, None)
660            .await
661            .unwrap();
662        store.commit_procedure(procedure_id, 2).await.unwrap();
663
664        store.delete_procedure(procedure_id).await.unwrap();
665
666        let ProcedureMessages {
667            messages,
668            rollback_messages,
669            finished_ids,
670        } = store.load_messages().await.unwrap();
671        assert!(messages.is_empty());
672        assert!(rollback_messages.is_empty());
673        assert!(finished_ids.is_empty());
674    }
675
676    #[tokio::test]
677    async fn test_load_messages() {
678        let dir = create_temp_dir("load_messages");
679        let store = procedure_store_for_test(&dir);
680
681        // store 3 steps
682        let id0 = ProcedureId::random();
683        let procedure: BoxedProcedure = Box::new(MockProcedure::new("id0-0"));
684        let type_name = procedure.type_name().to_string();
685        let data = procedure.dump().unwrap();
686        store
687            .store_procedure(id0, 0, type_name, data, None)
688            .await
689            .unwrap();
690        let procedure: BoxedProcedure = Box::new(MockProcedure::new("id0-1"));
691        let type_name = procedure.type_name().to_string();
692        let data = procedure.dump().unwrap();
693        store
694            .store_procedure(id0, 1, type_name, data, None)
695            .await
696            .unwrap();
697        let procedure: BoxedProcedure = Box::new(MockProcedure::new("id0-2"));
698        let type_name = procedure.type_name().to_string();
699        let data = procedure.dump().unwrap();
700        store
701            .store_procedure(id0, 2, type_name, data, None)
702            .await
703            .unwrap();
704
705        // store 2 steps and then commit
706        let id1 = ProcedureId::random();
707        let procedure: BoxedProcedure = Box::new(MockProcedure::new("id1-0"));
708        let type_name = procedure.type_name().to_string();
709        let data = procedure.dump().unwrap();
710        store
711            .store_procedure(id1, 0, type_name, data, None)
712            .await
713            .unwrap();
714        let procedure: BoxedProcedure = Box::new(MockProcedure::new("id1-1"));
715        let type_name = procedure.type_name().to_string();
716        let data = procedure.dump().unwrap();
717        store
718            .store_procedure(id1, 1, type_name, data, None)
719            .await
720            .unwrap();
721        store.commit_procedure(id1, 2).await.unwrap();
722
723        // store 1 step
724        let id2 = ProcedureId::random();
725        let procedure: BoxedProcedure = Box::new(MockProcedure::new("id2-0"));
726        let type_name = procedure.type_name().to_string();
727        let data = procedure.dump().unwrap();
728        store
729            .store_procedure(id2, 0, type_name, data, None)
730            .await
731            .unwrap();
732
733        let ProcedureMessages {
734            messages,
735            rollback_messages,
736            finished_ids,
737        } = store.load_messages().await.unwrap();
738        assert_eq!(2, messages.len());
739        assert!(rollback_messages.is_empty());
740        assert_eq!(1, finished_ids.len());
741
742        let msg = messages.get(&id0).unwrap();
743        assert_eq!("id0-2", msg.data);
744        let msg = messages.get(&id2).unwrap();
745        assert_eq!("id2-0", msg.data);
746    }
747}