mito2/wal/
entry_reader.rs1use api::v1::WalEntry;
16use async_stream::stream;
17use common_telemetry::tracing::warn;
18use futures::StreamExt;
19use object_store::Buffer;
20use prost::Message;
21use snafu::ResultExt;
22use store_api::logstore::entry::Entry;
23use store_api::logstore::provider::Provider;
24
25use crate::error::{DecodeWalSnafu, Result};
26use crate::wal::raw_entry_reader::RawEntryReader;
27use crate::wal::{EntryId, WalEntryStream};
28
29pub(crate) fn decode_raw_entry(raw_entry: Entry) -> Result<(EntryId, WalEntry)> {
33 let entry_id = raw_entry.entry_id();
34 let region_id = raw_entry.region_id();
35 debug_assert!(raw_entry.is_complete());
36 let buffer = into_buffer(raw_entry);
37 let wal_entry = WalEntry::decode(buffer).context(DecodeWalSnafu { region_id })?;
38 Ok((entry_id, wal_entry))
39}
40
41fn into_buffer(raw_entry: Entry) -> Buffer {
42 match raw_entry {
43 Entry::Naive(entry) => Buffer::from(entry.data),
44 Entry::MultiplePart(entry) => {
45 Buffer::from_iter(entry.parts.into_iter().map(bytes::Bytes::from))
46 }
47 }
48}
49
50pub(crate) trait WalEntryReader: Send + Sync {
54 fn read(&mut self, ns: &'_ Provider, start_id: EntryId) -> Result<WalEntryStream<'static>>;
55}
56
57pub(crate) struct NoopEntryReader;
58
59impl WalEntryReader for NoopEntryReader {
60 fn read(&mut self, _ns: &'_ Provider, _start_id: EntryId) -> Result<WalEntryStream<'static>> {
61 Ok(futures::stream::empty().boxed())
62 }
63}
64
65pub struct LogStoreEntryReader<R> {
67 reader: R,
68}
69
70impl<R> LogStoreEntryReader<R> {
71 pub fn new(reader: R) -> Self {
72 Self { reader }
73 }
74}
75
76impl<R: RawEntryReader> WalEntryReader for LogStoreEntryReader<R> {
77 fn read(&mut self, ns: &'_ Provider, start_id: EntryId) -> Result<WalEntryStream<'static>> {
78 let LogStoreEntryReader { reader } = self;
79 let mut stream = reader.read(ns, start_id)?;
80
81 let stream = stream! {
82 let mut buffered_entry: Option<Entry> = None;
83 while let Some(next_entry) = stream.next().await {
84 match buffered_entry.take() {
85 Some(entry) => {
86 if entry.is_complete() {
87 yield decode_raw_entry(entry);
88 } else {
89 warn!("Ignoring incomplete entry: {}", entry);
90 }
91 buffered_entry = Some(next_entry?);
92 },
93 None => {
94 buffered_entry = Some(next_entry?);
95 }
96 };
97 }
98 if let Some(entry) = buffered_entry {
99 if entry.is_complete() {
101 yield decode_raw_entry(entry);
102 } else {
103 warn!("Ignoring incomplete entry: {}", entry);
104 }
105 }
106 };
107
108 Ok(Box::pin(stream))
109 }
110}
111
112#[cfg(test)]
113mod tests {
114
115 use api::v1::{Mutation, OpType, WalEntry};
116 use futures::TryStreamExt;
117 use prost::Message;
118 use store_api::logstore::entry::{Entry, MultiplePartEntry, MultiplePartHeader};
119 use store_api::logstore::provider::Provider;
120 use store_api::storage::RegionId;
121
122 use crate::test_util::wal_util::MockRawEntryStream;
123 use crate::wal::entry_reader::{LogStoreEntryReader, WalEntryReader};
124
125 #[tokio::test]
126 async fn test_tail_corrupted_stream() {
127 common_telemetry::init_default_ut_logging();
128 let provider = Provider::kafka_provider("my_topic".to_string());
129 let wal_entry = WalEntry {
130 mutations: vec![Mutation {
131 op_type: OpType::Put as i32,
132 sequence: 1u64,
133 rows: None,
134 write_hint: None,
135 }],
136 bulk_entries: vec![],
137 };
138 let encoded_entry = wal_entry.encode_to_vec();
139 let parts = encoded_entry
140 .chunks(encoded_entry.len() / 2)
141 .map(Into::into)
142 .collect::<Vec<_>>();
143 let raw_entry_stream = MockRawEntryStream {
144 entries: vec![
145 Entry::MultiplePart(MultiplePartEntry {
146 provider: provider.clone(),
147 region_id: RegionId::new(1, 1),
148 entry_id: 2,
149 headers: vec![MultiplePartHeader::First, MultiplePartHeader::Last],
150 parts,
151 }),
152 Entry::MultiplePart(MultiplePartEntry {
154 provider: provider.clone(),
155 region_id: RegionId::new(1, 1),
156 entry_id: 1,
157 headers: vec![MultiplePartHeader::Last],
158 parts: vec![vec![1; 100]],
159 }),
160 ],
161 };
162
163 let mut reader = LogStoreEntryReader::new(raw_entry_stream);
164 let entries = reader
165 .read(&provider, 0)
166 .unwrap()
167 .try_collect::<Vec<_>>()
168 .await
169 .unwrap()
170 .into_iter()
171 .map(|(_, entry)| entry)
172 .collect::<Vec<_>>();
173
174 assert_eq!(entries, vec![wal_entry]);
175 }
176
177 #[tokio::test]
178 async fn test_corrupted_stream() {
179 let provider = Provider::kafka_provider("my_topic".to_string());
180 let raw_entry_stream = MockRawEntryStream {
181 entries: vec![
182 Entry::MultiplePart(MultiplePartEntry {
184 provider: provider.clone(),
185 region_id: RegionId::new(1, 1),
186 entry_id: 1,
187 headers: vec![MultiplePartHeader::Last],
188 parts: vec![vec![1; 100]],
189 }),
190 Entry::MultiplePart(MultiplePartEntry {
191 provider: provider.clone(),
192 region_id: RegionId::new(1, 1),
193 entry_id: 2,
194 headers: vec![MultiplePartHeader::First],
195 parts: vec![vec![1; 100]],
196 }),
197 ],
198 };
199
200 let mut reader = LogStoreEntryReader::new(raw_entry_stream);
201 let entries = reader
202 .read(&provider, 0)
203 .unwrap()
204 .try_collect::<Vec<_>>()
205 .await
206 .unwrap();
207 assert!(entries.is_empty());
208 }
209}