mito2/wal/
raw_entry_reader.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use async_stream::try_stream;
18use common_error::ext::BoxedError;
19use futures::stream::BoxStream;
20use snafu::ResultExt;
21use store_api::logstore::entry::Entry;
22use store_api::logstore::provider::Provider;
23use store_api::logstore::{LogStore, WalIndex};
24use store_api::storage::RegionId;
25use tokio_stream::StreamExt;
26
27use crate::error::{self, Result};
28use crate::wal::EntryId;
29
30/// A stream that yields [Entry].
31pub type EntryStream<'a> = BoxStream<'a, Result<Entry>>;
32
33/// [RawEntryReader] provides the ability to read [Entry] from the underlying [LogStore].
34pub(crate) trait RawEntryReader: Send + Sync {
35    fn read(&self, provider: &Provider, start_id: EntryId) -> Result<EntryStream<'static>>;
36}
37
38/// Implement the [RawEntryReader] for the [LogStore].
39pub struct LogStoreRawEntryReader<S> {
40    store: Arc<S>,
41    wal_index: Option<WalIndex>,
42}
43
44impl<S> LogStoreRawEntryReader<S> {
45    pub fn new(store: Arc<S>) -> Self {
46        Self {
47            store,
48            wal_index: None,
49        }
50    }
51
52    pub fn with_wal_index(mut self, wal_index: WalIndex) -> Self {
53        self.wal_index = Some(wal_index);
54        self
55    }
56}
57
58impl<S: LogStore> RawEntryReader for LogStoreRawEntryReader<S> {
59    fn read(&self, provider: &Provider, start_id: EntryId) -> Result<EntryStream<'static>> {
60        let store = self.store.clone();
61        let provider = provider.clone();
62        let wal_index = self.wal_index;
63        let stream = try_stream!({
64            let mut stream = store
65                .read(&provider, start_id, wal_index)
66                .await
67                .map_err(BoxedError::new)
68                .with_context(|_| error::ReadWalSnafu {
69                    provider: provider.clone(),
70                })?;
71
72            while let Some(entries) = stream.next().await {
73                let entries =
74                    entries
75                        .map_err(BoxedError::new)
76                        .with_context(|_| error::ReadWalSnafu {
77                            provider: provider.clone(),
78                        })?;
79
80                for entry in entries {
81                    yield entry
82                }
83            }
84        });
85
86        Ok(Box::pin(stream))
87    }
88}
89
90/// A [RawEntryReader] reads [RawEntry] belongs to a specific region.
91pub struct RegionRawEntryReader<R> {
92    reader: R,
93    region_id: RegionId,
94}
95
96impl<R> RegionRawEntryReader<R>
97where
98    R: RawEntryReader,
99{
100    pub fn new(reader: R, region_id: RegionId) -> Self {
101        Self { reader, region_id }
102    }
103}
104
105impl<R> RawEntryReader for RegionRawEntryReader<R>
106where
107    R: RawEntryReader,
108{
109    fn read(&self, ctx: &Provider, start_id: EntryId) -> Result<EntryStream<'static>> {
110        let mut stream = self.reader.read(ctx, start_id)?;
111        let region_id = self.region_id;
112
113        let stream = try_stream!({
114            while let Some(entry) = stream.next().await {
115                let entry = entry?;
116                if entry.region_id() == region_id {
117                    yield entry
118                }
119            }
120        });
121
122        Ok(Box::pin(stream))
123    }
124}
125
126#[cfg(test)]
127mod tests {
128    use std::sync::Arc;
129
130    use futures::{stream, TryStreamExt};
131    use store_api::logstore::entry::{Entry, NaiveEntry};
132    use store_api::logstore::{
133        AppendBatchResponse, EntryId, LogStore, SendableEntryStream, WalIndex,
134    };
135    use store_api::storage::RegionId;
136
137    use super::*;
138    use crate::error;
139
140    #[derive(Debug)]
141    struct MockLogStore {
142        entries: Vec<Entry>,
143    }
144
145    #[async_trait::async_trait]
146    impl LogStore for MockLogStore {
147        type Error = error::Error;
148
149        async fn stop(&self) -> Result<(), Self::Error> {
150            unreachable!()
151        }
152
153        async fn append_batch(
154            &self,
155            _entries: Vec<Entry>,
156        ) -> Result<AppendBatchResponse, Self::Error> {
157            unreachable!()
158        }
159
160        async fn read(
161            &self,
162            _provider: &Provider,
163            _id: EntryId,
164            _index: Option<WalIndex>,
165        ) -> Result<SendableEntryStream<'static, Entry, Self::Error>, Self::Error> {
166            Ok(Box::pin(stream::iter(vec![Ok(self.entries.clone())])))
167        }
168
169        async fn create_namespace(&self, _ns: &Provider) -> Result<(), Self::Error> {
170            unreachable!()
171        }
172
173        async fn delete_namespace(&self, _ns: &Provider) -> Result<(), Self::Error> {
174            unreachable!()
175        }
176
177        async fn list_namespaces(&self) -> Result<Vec<Provider>, Self::Error> {
178            unreachable!()
179        }
180
181        async fn obsolete(
182            &self,
183            _provider: &Provider,
184            _region_id: RegionId,
185            _entry_id: EntryId,
186        ) -> Result<(), Self::Error> {
187            unreachable!()
188        }
189
190        fn entry(
191            &self,
192            _data: &mut Vec<u8>,
193            _entry_id: EntryId,
194            _region_id: RegionId,
195            _provider: &Provider,
196        ) -> Result<Entry, Self::Error> {
197            unreachable!()
198        }
199
200        fn high_watermark(&self, _provider: &Provider) -> Result<EntryId, Self::Error> {
201            unreachable!()
202        }
203    }
204
205    #[tokio::test]
206    async fn test_raw_entry_reader() {
207        let provider = Provider::raft_engine_provider(RegionId::new(1024, 1).as_u64());
208        let expected_entries = vec![Entry::Naive(NaiveEntry {
209            provider: provider.clone(),
210            region_id: RegionId::new(1024, 1),
211            entry_id: 1,
212            data: vec![1],
213        })];
214        let store = MockLogStore {
215            entries: expected_entries.clone(),
216        };
217
218        let reader = LogStoreRawEntryReader::new(Arc::new(store));
219        let entries = reader
220            .read(&provider, 0)
221            .unwrap()
222            .try_collect::<Vec<_>>()
223            .await
224            .unwrap();
225        assert_eq!(expected_entries, entries);
226    }
227
228    #[tokio::test]
229    async fn test_raw_entry_reader_filter() {
230        let provider = Provider::raft_engine_provider(RegionId::new(1024, 1).as_u64());
231        let all_entries = vec![
232            Entry::Naive(NaiveEntry {
233                provider: provider.clone(),
234                region_id: RegionId::new(1024, 1),
235                entry_id: 1,
236                data: vec![1],
237            }),
238            Entry::Naive(NaiveEntry {
239                provider: provider.clone(),
240                region_id: RegionId::new(1024, 2),
241                entry_id: 2,
242                data: vec![2],
243            }),
244            Entry::Naive(NaiveEntry {
245                provider: provider.clone(),
246                region_id: RegionId::new(1024, 3),
247                entry_id: 3,
248                data: vec![3],
249            }),
250        ];
251        let store = MockLogStore {
252            entries: all_entries.clone(),
253        };
254
255        let expected_region_id = RegionId::new(1024, 3);
256        let reader = RegionRawEntryReader::new(
257            LogStoreRawEntryReader::new(Arc::new(store)),
258            expected_region_id,
259        );
260        let entries = reader
261            .read(&provider, 0)
262            .unwrap()
263            .try_collect::<Vec<_>>()
264            .await
265            .unwrap();
266        assert_eq!(
267            all_entries
268                .into_iter()
269                .filter(|entry| entry.region_id() == expected_region_id)
270                .collect::<Vec<_>>(),
271            entries
272        );
273    }
274}