mito2/wal/
entry_reader.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use api::v1::WalEntry;
16use async_stream::stream;
17use common_telemetry::tracing::warn;
18use futures::StreamExt;
19use object_store::Buffer;
20use prost::Message;
21use snafu::ResultExt;
22use store_api::logstore::entry::Entry;
23use store_api::logstore::provider::Provider;
24
25use crate::error::{DecodeWalSnafu, Result};
26use crate::wal::raw_entry_reader::RawEntryReader;
27use crate::wal::{EntryId, WalEntryStream};
28
29/// Decodes the [Entry] into [WalEntry].
30///
31/// The caller must ensure the [Entry] is complete.
32pub(crate) fn decode_raw_entry(raw_entry: Entry) -> Result<(EntryId, WalEntry)> {
33    let entry_id = raw_entry.entry_id();
34    let region_id = raw_entry.region_id();
35    debug_assert!(raw_entry.is_complete());
36    let buffer = into_buffer(raw_entry);
37    let wal_entry = WalEntry::decode(buffer).context(DecodeWalSnafu { region_id })?;
38    Ok((entry_id, wal_entry))
39}
40
41fn into_buffer(raw_entry: Entry) -> Buffer {
42    match raw_entry {
43        Entry::Naive(entry) => Buffer::from(entry.data),
44        Entry::MultiplePart(entry) => {
45            Buffer::from_iter(entry.parts.into_iter().map(bytes::Bytes::from))
46        }
47    }
48}
49
50/// [WalEntryReader] provides the ability to read and decode entries from the underlying store.
51///
52/// Notes: It will consume the inner stream and only allow invoking the `read` at once.
53pub(crate) trait WalEntryReader: Send + Sync {
54    fn read(&mut self, ns: &'_ Provider, start_id: EntryId) -> Result<WalEntryStream<'static>>;
55}
56
57pub(crate) struct NoopEntryReader;
58
59impl WalEntryReader for NoopEntryReader {
60    fn read(&mut self, _ns: &'_ Provider, _start_id: EntryId) -> Result<WalEntryStream<'static>> {
61        Ok(futures::stream::empty().boxed())
62    }
63}
64
65/// A Reader reads the [Entry] from [RawEntryReader] and decodes [Entry] into [WalEntry].
66pub struct LogStoreEntryReader<R> {
67    reader: R,
68}
69
70impl<R> LogStoreEntryReader<R> {
71    pub fn new(reader: R) -> Self {
72        Self { reader }
73    }
74}
75
76impl<R: RawEntryReader> WalEntryReader for LogStoreEntryReader<R> {
77    fn read(&mut self, ns: &'_ Provider, start_id: EntryId) -> Result<WalEntryStream<'static>> {
78        let LogStoreEntryReader { reader } = self;
79        let mut stream = reader.read(ns, start_id)?;
80
81        let stream = stream! {
82            let mut buffered_entry: Option<Entry> = None;
83            while let Some(next_entry) = stream.next().await {
84                match buffered_entry.take() {
85                    Some(entry) => {
86                        if entry.is_complete() {
87                            yield decode_raw_entry(entry);
88                        } else {
89                            warn!("Ignoring incomplete entry: {}", entry);
90                        }
91                        buffered_entry = Some(next_entry?);
92                    },
93                    None => {
94                        buffered_entry = Some(next_entry?);
95                    }
96                };
97            }
98            if let Some(entry) = buffered_entry {
99                // Ignores tail corrupted data.
100                if entry.is_complete() {
101                    yield decode_raw_entry(entry);
102                } else {
103                    warn!("Ignoring incomplete entry: {}", entry);
104                }
105            }
106        };
107
108        Ok(Box::pin(stream))
109    }
110}
111
112#[cfg(test)]
113mod tests {
114
115    use api::v1::{Mutation, OpType, WalEntry};
116    use futures::TryStreamExt;
117    use prost::Message;
118    use store_api::logstore::entry::{Entry, MultiplePartEntry, MultiplePartHeader};
119    use store_api::logstore::provider::Provider;
120    use store_api::storage::RegionId;
121
122    use crate::test_util::wal_util::MockRawEntryStream;
123    use crate::wal::entry_reader::{LogStoreEntryReader, WalEntryReader};
124
125    #[tokio::test]
126    async fn test_tail_corrupted_stream() {
127        common_telemetry::init_default_ut_logging();
128        let provider = Provider::kafka_provider("my_topic".to_string());
129        let wal_entry = WalEntry {
130            mutations: vec![Mutation {
131                op_type: OpType::Put as i32,
132                sequence: 1u64,
133                rows: None,
134                write_hint: None,
135            }],
136            bulk_entries: vec![],
137        };
138        let encoded_entry = wal_entry.encode_to_vec();
139        let parts = encoded_entry
140            .chunks(encoded_entry.len() / 2)
141            .map(Into::into)
142            .collect::<Vec<_>>();
143        let raw_entry_stream = MockRawEntryStream {
144            entries: vec![
145                Entry::MultiplePart(MultiplePartEntry {
146                    provider: provider.clone(),
147                    region_id: RegionId::new(1, 1),
148                    entry_id: 2,
149                    headers: vec![MultiplePartHeader::First, MultiplePartHeader::Last],
150                    parts,
151                }),
152                // The tail incomplete entry.
153                Entry::MultiplePart(MultiplePartEntry {
154                    provider: provider.clone(),
155                    region_id: RegionId::new(1, 1),
156                    entry_id: 1,
157                    headers: vec![MultiplePartHeader::Last],
158                    parts: vec![vec![1; 100]],
159                }),
160            ],
161        };
162
163        let mut reader = LogStoreEntryReader::new(raw_entry_stream);
164        let entries = reader
165            .read(&provider, 0)
166            .unwrap()
167            .try_collect::<Vec<_>>()
168            .await
169            .unwrap()
170            .into_iter()
171            .map(|(_, entry)| entry)
172            .collect::<Vec<_>>();
173
174        assert_eq!(entries, vec![wal_entry]);
175    }
176
177    #[tokio::test]
178    async fn test_corrupted_stream() {
179        let provider = Provider::kafka_provider("my_topic".to_string());
180        let raw_entry_stream = MockRawEntryStream {
181            entries: vec![
182                // The incomplete entry.
183                Entry::MultiplePart(MultiplePartEntry {
184                    provider: provider.clone(),
185                    region_id: RegionId::new(1, 1),
186                    entry_id: 1,
187                    headers: vec![MultiplePartHeader::Last],
188                    parts: vec![vec![1; 100]],
189                }),
190                Entry::MultiplePart(MultiplePartEntry {
191                    provider: provider.clone(),
192                    region_id: RegionId::new(1, 1),
193                    entry_id: 2,
194                    headers: vec![MultiplePartHeader::First],
195                    parts: vec![vec![1; 100]],
196                }),
197            ],
198        };
199
200        let mut reader = LogStoreEntryReader::new(raw_entry_stream);
201        let entries = reader
202            .read(&provider, 0)
203            .unwrap()
204            .try_collect::<Vec<_>>()
205            .await
206            .unwrap();
207        assert!(entries.is_empty());
208    }
209}