mito2/wal/
entry_reader.rs1use api::v1::WalEntry;
16use async_stream::stream;
17use futures::StreamExt;
18use object_store::Buffer;
19use prost::Message;
20use snafu::{ensure, ResultExt};
21use store_api::logstore::entry::Entry;
22use store_api::logstore::provider::Provider;
23
24use crate::error::{CorruptedEntrySnafu, DecodeWalSnafu, Result};
25use crate::wal::raw_entry_reader::RawEntryReader;
26use crate::wal::{EntryId, WalEntryStream};
27
28pub(crate) fn decode_raw_entry(raw_entry: Entry) -> Result<(EntryId, WalEntry)> {
29 let entry_id = raw_entry.entry_id();
30 let region_id = raw_entry.region_id();
31 ensure!(raw_entry.is_complete(), CorruptedEntrySnafu { region_id });
32 let buffer = into_buffer(raw_entry);
33 let wal_entry = WalEntry::decode(buffer).context(DecodeWalSnafu { region_id })?;
34 Ok((entry_id, wal_entry))
35}
36
37fn into_buffer(raw_entry: Entry) -> Buffer {
38 match raw_entry {
39 Entry::Naive(entry) => Buffer::from(entry.data),
40 Entry::MultiplePart(entry) => {
41 Buffer::from_iter(entry.parts.into_iter().map(bytes::Bytes::from))
42 }
43 }
44}
45
46pub(crate) trait WalEntryReader: Send + Sync {
50 fn read(&mut self, ns: &'_ Provider, start_id: EntryId) -> Result<WalEntryStream<'static>>;
51}
52
53pub(crate) struct NoopEntryReader;
54
55impl WalEntryReader for NoopEntryReader {
56 fn read(&mut self, _ns: &'_ Provider, _start_id: EntryId) -> Result<WalEntryStream<'static>> {
57 Ok(futures::stream::empty().boxed())
58 }
59}
60
61pub struct LogStoreEntryReader<R> {
63 reader: R,
64}
65
66impl<R> LogStoreEntryReader<R> {
67 pub fn new(reader: R) -> Self {
68 Self { reader }
69 }
70}
71
72impl<R: RawEntryReader> WalEntryReader for LogStoreEntryReader<R> {
73 fn read(&mut self, ns: &'_ Provider, start_id: EntryId) -> Result<WalEntryStream<'static>> {
74 let LogStoreEntryReader { reader } = self;
75 let mut stream = reader.read(ns, start_id)?;
76
77 let stream = stream! {
78 let mut buffered_entry = None;
79 while let Some(next_entry) = stream.next().await {
80 match buffered_entry.take() {
81 Some(entry) => {
82 yield decode_raw_entry(entry);
83 buffered_entry = Some(next_entry?);
84 },
85 None => {
86 buffered_entry = Some(next_entry?);
87 }
88 };
89 }
90 if let Some(entry) = buffered_entry {
91 if entry.is_complete() {
93 yield decode_raw_entry(entry);
94 }
95 }
96 };
97
98 Ok(Box::pin(stream))
99 }
100}
101
102#[cfg(test)]
103mod tests {
104 use std::assert_matches::assert_matches;
105
106 use api::v1::{Mutation, OpType, WalEntry};
107 use futures::TryStreamExt;
108 use prost::Message;
109 use store_api::logstore::entry::{Entry, MultiplePartEntry, MultiplePartHeader};
110 use store_api::logstore::provider::Provider;
111 use store_api::storage::RegionId;
112
113 use crate::error;
114 use crate::test_util::wal_util::MockRawEntryStream;
115 use crate::wal::entry_reader::{LogStoreEntryReader, WalEntryReader};
116
117 #[tokio::test]
118 async fn test_tail_corrupted_stream() {
119 common_telemetry::init_default_ut_logging();
120 let provider = Provider::kafka_provider("my_topic".to_string());
121 let wal_entry = WalEntry {
122 mutations: vec![Mutation {
123 op_type: OpType::Put as i32,
124 sequence: 1u64,
125 rows: None,
126 write_hint: None,
127 }],
128 };
129 let encoded_entry = wal_entry.encode_to_vec();
130 let parts = encoded_entry
131 .chunks(encoded_entry.len() / 2)
132 .map(Into::into)
133 .collect::<Vec<_>>();
134 let raw_entry_stream = MockRawEntryStream {
135 entries: vec![
136 Entry::MultiplePart(MultiplePartEntry {
137 provider: provider.clone(),
138 region_id: RegionId::new(1, 1),
139 entry_id: 2,
140 headers: vec![MultiplePartHeader::First, MultiplePartHeader::Last],
141 parts,
142 }),
143 Entry::MultiplePart(MultiplePartEntry {
145 provider: provider.clone(),
146 region_id: RegionId::new(1, 1),
147 entry_id: 1,
148 headers: vec![MultiplePartHeader::Last],
149 parts: vec![vec![1; 100]],
150 }),
151 ],
152 };
153
154 let mut reader = LogStoreEntryReader::new(raw_entry_stream);
155 let entries = reader
156 .read(&provider, 0)
157 .unwrap()
158 .try_collect::<Vec<_>>()
159 .await
160 .unwrap()
161 .into_iter()
162 .map(|(_, entry)| entry)
163 .collect::<Vec<_>>();
164
165 assert_eq!(entries, vec![wal_entry]);
166 }
167
168 #[tokio::test]
169 async fn test_corrupted_stream() {
170 let provider = Provider::kafka_provider("my_topic".to_string());
171 let raw_entry_stream = MockRawEntryStream {
172 entries: vec![
173 Entry::MultiplePart(MultiplePartEntry {
174 provider: provider.clone(),
175 region_id: RegionId::new(1, 1),
176 entry_id: 1,
177 headers: vec![MultiplePartHeader::Last],
178 parts: vec![vec![1; 100]],
179 }),
180 Entry::MultiplePart(MultiplePartEntry {
181 provider: provider.clone(),
182 region_id: RegionId::new(1, 1),
183 entry_id: 2,
184 headers: vec![MultiplePartHeader::First],
185 parts: vec![vec![1; 100]],
186 }),
187 ],
188 };
189
190 let mut reader = LogStoreEntryReader::new(raw_entry_stream);
191 let err = reader
192 .read(&provider, 0)
193 .unwrap()
194 .try_collect::<Vec<_>>()
195 .await
196 .unwrap_err();
197 assert_matches!(err, error::Error::CorruptedEntry { .. });
198 }
199}