mito2/wal/
entry_reader.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use api::v1::WalEntry;
16use async_stream::stream;
17use futures::StreamExt;
18use object_store::Buffer;
19use prost::Message;
20use snafu::{ensure, ResultExt};
21use store_api::logstore::entry::Entry;
22use store_api::logstore::provider::Provider;
23
24use crate::error::{CorruptedEntrySnafu, DecodeWalSnafu, Result};
25use crate::wal::raw_entry_reader::RawEntryReader;
26use crate::wal::{EntryId, WalEntryStream};
27
28pub(crate) fn decode_raw_entry(raw_entry: Entry) -> Result<(EntryId, WalEntry)> {
29    let entry_id = raw_entry.entry_id();
30    let region_id = raw_entry.region_id();
31    ensure!(raw_entry.is_complete(), CorruptedEntrySnafu { region_id });
32    let buffer = into_buffer(raw_entry);
33    let wal_entry = WalEntry::decode(buffer).context(DecodeWalSnafu { region_id })?;
34    Ok((entry_id, wal_entry))
35}
36
37fn into_buffer(raw_entry: Entry) -> Buffer {
38    match raw_entry {
39        Entry::Naive(entry) => Buffer::from(entry.data),
40        Entry::MultiplePart(entry) => {
41            Buffer::from_iter(entry.parts.into_iter().map(bytes::Bytes::from))
42        }
43    }
44}
45
46/// [WalEntryReader] provides the ability to read and decode entries from the underlying store.
47///
48/// Notes: It will consume the inner stream and only allow invoking the `read` at once.
49pub(crate) trait WalEntryReader: Send + Sync {
50    fn read(&mut self, ns: &'_ Provider, start_id: EntryId) -> Result<WalEntryStream<'static>>;
51}
52
53pub(crate) struct NoopEntryReader;
54
55impl WalEntryReader for NoopEntryReader {
56    fn read(&mut self, _ns: &'_ Provider, _start_id: EntryId) -> Result<WalEntryStream<'static>> {
57        Ok(futures::stream::empty().boxed())
58    }
59}
60
61/// A Reader reads the [RawEntry] from [RawEntryReader] and decodes [RawEntry] into [WalEntry].
62pub struct LogStoreEntryReader<R> {
63    reader: R,
64}
65
66impl<R> LogStoreEntryReader<R> {
67    pub fn new(reader: R) -> Self {
68        Self { reader }
69    }
70}
71
72impl<R: RawEntryReader> WalEntryReader for LogStoreEntryReader<R> {
73    fn read(&mut self, ns: &'_ Provider, start_id: EntryId) -> Result<WalEntryStream<'static>> {
74        let LogStoreEntryReader { reader } = self;
75        let mut stream = reader.read(ns, start_id)?;
76
77        let stream = stream! {
78            let mut buffered_entry = None;
79            while let Some(next_entry) = stream.next().await {
80                match buffered_entry.take() {
81                    Some(entry) => {
82                        yield decode_raw_entry(entry);
83                        buffered_entry = Some(next_entry?);
84                    },
85                    None => {
86                        buffered_entry = Some(next_entry?);
87                    }
88                };
89            }
90            if let Some(entry) = buffered_entry {
91                // Ignores tail corrupted data.
92                if entry.is_complete() {
93                    yield decode_raw_entry(entry);
94                }
95            }
96        };
97
98        Ok(Box::pin(stream))
99    }
100}
101
102#[cfg(test)]
103mod tests {
104    use std::assert_matches::assert_matches;
105
106    use api::v1::{Mutation, OpType, WalEntry};
107    use futures::TryStreamExt;
108    use prost::Message;
109    use store_api::logstore::entry::{Entry, MultiplePartEntry, MultiplePartHeader};
110    use store_api::logstore::provider::Provider;
111    use store_api::storage::RegionId;
112
113    use crate::error;
114    use crate::test_util::wal_util::MockRawEntryStream;
115    use crate::wal::entry_reader::{LogStoreEntryReader, WalEntryReader};
116
117    #[tokio::test]
118    async fn test_tail_corrupted_stream() {
119        common_telemetry::init_default_ut_logging();
120        let provider = Provider::kafka_provider("my_topic".to_string());
121        let wal_entry = WalEntry {
122            mutations: vec![Mutation {
123                op_type: OpType::Put as i32,
124                sequence: 1u64,
125                rows: None,
126                write_hint: None,
127            }],
128        };
129        let encoded_entry = wal_entry.encode_to_vec();
130        let parts = encoded_entry
131            .chunks(encoded_entry.len() / 2)
132            .map(Into::into)
133            .collect::<Vec<_>>();
134        let raw_entry_stream = MockRawEntryStream {
135            entries: vec![
136                Entry::MultiplePart(MultiplePartEntry {
137                    provider: provider.clone(),
138                    region_id: RegionId::new(1, 1),
139                    entry_id: 2,
140                    headers: vec![MultiplePartHeader::First, MultiplePartHeader::Last],
141                    parts,
142                }),
143                // The tail corrupted data.
144                Entry::MultiplePart(MultiplePartEntry {
145                    provider: provider.clone(),
146                    region_id: RegionId::new(1, 1),
147                    entry_id: 1,
148                    headers: vec![MultiplePartHeader::Last],
149                    parts: vec![vec![1; 100]],
150                }),
151            ],
152        };
153
154        let mut reader = LogStoreEntryReader::new(raw_entry_stream);
155        let entries = reader
156            .read(&provider, 0)
157            .unwrap()
158            .try_collect::<Vec<_>>()
159            .await
160            .unwrap()
161            .into_iter()
162            .map(|(_, entry)| entry)
163            .collect::<Vec<_>>();
164
165        assert_eq!(entries, vec![wal_entry]);
166    }
167
168    #[tokio::test]
169    async fn test_corrupted_stream() {
170        let provider = Provider::kafka_provider("my_topic".to_string());
171        let raw_entry_stream = MockRawEntryStream {
172            entries: vec![
173                Entry::MultiplePart(MultiplePartEntry {
174                    provider: provider.clone(),
175                    region_id: RegionId::new(1, 1),
176                    entry_id: 1,
177                    headers: vec![MultiplePartHeader::Last],
178                    parts: vec![vec![1; 100]],
179                }),
180                Entry::MultiplePart(MultiplePartEntry {
181                    provider: provider.clone(),
182                    region_id: RegionId::new(1, 1),
183                    entry_id: 2,
184                    headers: vec![MultiplePartHeader::First],
185                    parts: vec![vec![1; 100]],
186                }),
187            ],
188        };
189
190        let mut reader = LogStoreEntryReader::new(raw_entry_stream);
191        let err = reader
192            .read(&provider, 0)
193            .unwrap()
194            .try_collect::<Vec<_>>()
195            .await
196            .unwrap_err();
197        assert_matches!(err, error::Error::CorruptedEntry { .. });
198    }
199}