1use std::sync::Arc;
16
17use async_stream::try_stream;
18use common_error::ext::BoxedError;
19use futures::stream::BoxStream;
20use snafu::ResultExt;
21use store_api::logstore::entry::Entry;
22use store_api::logstore::provider::Provider;
23use store_api::logstore::{LogStore, WalIndex};
24use store_api::storage::RegionId;
25use tokio_stream::StreamExt;
26
27use crate::error::{self, Result};
28use crate::wal::EntryId;
29
30pub type EntryStream<'a> = BoxStream<'a, Result<Entry>>;
32
33pub(crate) trait RawEntryReader: Send + Sync {
35 fn read(&self, provider: &Provider, start_id: EntryId) -> Result<EntryStream<'static>>;
36}
37
38pub struct LogStoreRawEntryReader<S> {
40 store: Arc<S>,
41 wal_index: Option<WalIndex>,
42}
43
44impl<S> LogStoreRawEntryReader<S> {
45 pub fn new(store: Arc<S>) -> Self {
46 Self {
47 store,
48 wal_index: None,
49 }
50 }
51
52 pub fn with_wal_index(mut self, wal_index: WalIndex) -> Self {
53 self.wal_index = Some(wal_index);
54 self
55 }
56}
57
58impl<S: LogStore> RawEntryReader for LogStoreRawEntryReader<S> {
59 fn read(&self, provider: &Provider, start_id: EntryId) -> Result<EntryStream<'static>> {
60 let store = self.store.clone();
61 let provider = provider.clone();
62 let wal_index = self.wal_index;
63 let stream = try_stream!({
64 let mut stream = store
65 .read(&provider, start_id, wal_index)
66 .await
67 .map_err(BoxedError::new)
68 .with_context(|_| error::ReadWalSnafu {
69 provider: provider.clone(),
70 })?;
71
72 while let Some(entries) = stream.next().await {
73 let entries =
74 entries
75 .map_err(BoxedError::new)
76 .with_context(|_| error::ReadWalSnafu {
77 provider: provider.clone(),
78 })?;
79
80 for entry in entries {
81 yield entry
82 }
83 }
84 });
85
86 Ok(Box::pin(stream))
87 }
88}
89
90pub struct RegionRawEntryReader<R> {
92 reader: R,
93 region_id: RegionId,
94}
95
96impl<R> RegionRawEntryReader<R>
97where
98 R: RawEntryReader,
99{
100 pub fn new(reader: R, region_id: RegionId) -> Self {
101 Self { reader, region_id }
102 }
103}
104
105impl<R> RawEntryReader for RegionRawEntryReader<R>
106where
107 R: RawEntryReader,
108{
109 fn read(&self, ctx: &Provider, start_id: EntryId) -> Result<EntryStream<'static>> {
110 let mut stream = self.reader.read(ctx, start_id)?;
111 let region_id = self.region_id;
112
113 let stream = try_stream!({
114 while let Some(entry) = stream.next().await {
115 let entry = entry?;
116 if entry.region_id() == region_id {
117 yield entry
118 }
119 }
120 });
121
122 Ok(Box::pin(stream))
123 }
124}
125
126#[cfg(test)]
127mod tests {
128 use std::sync::Arc;
129
130 use futures::{stream, TryStreamExt};
131 use store_api::logstore::entry::{Entry, NaiveEntry};
132 use store_api::logstore::{
133 AppendBatchResponse, EntryId, LogStore, SendableEntryStream, WalIndex,
134 };
135 use store_api::storage::RegionId;
136
137 use super::*;
138 use crate::error;
139
140 #[derive(Debug)]
141 struct MockLogStore {
142 entries: Vec<Entry>,
143 }
144
145 #[async_trait::async_trait]
146 impl LogStore for MockLogStore {
147 type Error = error::Error;
148
149 async fn stop(&self) -> Result<(), Self::Error> {
150 unreachable!()
151 }
152
153 async fn append_batch(
154 &self,
155 _entries: Vec<Entry>,
156 ) -> Result<AppendBatchResponse, Self::Error> {
157 unreachable!()
158 }
159
160 async fn read(
161 &self,
162 _provider: &Provider,
163 _id: EntryId,
164 _index: Option<WalIndex>,
165 ) -> Result<SendableEntryStream<'static, Entry, Self::Error>, Self::Error> {
166 Ok(Box::pin(stream::iter(vec![Ok(self.entries.clone())])))
167 }
168
169 async fn create_namespace(&self, _ns: &Provider) -> Result<(), Self::Error> {
170 unreachable!()
171 }
172
173 async fn delete_namespace(&self, _ns: &Provider) -> Result<(), Self::Error> {
174 unreachable!()
175 }
176
177 async fn list_namespaces(&self) -> Result<Vec<Provider>, Self::Error> {
178 unreachable!()
179 }
180
181 async fn obsolete(
182 &self,
183 _provider: &Provider,
184 _region_id: RegionId,
185 _entry_id: EntryId,
186 ) -> Result<(), Self::Error> {
187 unreachable!()
188 }
189
190 fn entry(
191 &self,
192 _data: &mut Vec<u8>,
193 _entry_id: EntryId,
194 _region_id: RegionId,
195 _provider: &Provider,
196 ) -> Result<Entry, Self::Error> {
197 unreachable!()
198 }
199
200 fn high_watermark(&self, _provider: &Provider) -> Result<EntryId, Self::Error> {
201 unreachable!()
202 }
203 }
204
205 #[tokio::test]
206 async fn test_raw_entry_reader() {
207 let provider = Provider::raft_engine_provider(RegionId::new(1024, 1).as_u64());
208 let expected_entries = vec![Entry::Naive(NaiveEntry {
209 provider: provider.clone(),
210 region_id: RegionId::new(1024, 1),
211 entry_id: 1,
212 data: vec![1],
213 })];
214 let store = MockLogStore {
215 entries: expected_entries.clone(),
216 };
217
218 let reader = LogStoreRawEntryReader::new(Arc::new(store));
219 let entries = reader
220 .read(&provider, 0)
221 .unwrap()
222 .try_collect::<Vec<_>>()
223 .await
224 .unwrap();
225 assert_eq!(expected_entries, entries);
226 }
227
228 #[tokio::test]
229 async fn test_raw_entry_reader_filter() {
230 let provider = Provider::raft_engine_provider(RegionId::new(1024, 1).as_u64());
231 let all_entries = vec![
232 Entry::Naive(NaiveEntry {
233 provider: provider.clone(),
234 region_id: RegionId::new(1024, 1),
235 entry_id: 1,
236 data: vec![1],
237 }),
238 Entry::Naive(NaiveEntry {
239 provider: provider.clone(),
240 region_id: RegionId::new(1024, 2),
241 entry_id: 2,
242 data: vec![2],
243 }),
244 Entry::Naive(NaiveEntry {
245 provider: provider.clone(),
246 region_id: RegionId::new(1024, 3),
247 entry_id: 3,
248 data: vec![3],
249 }),
250 ];
251 let store = MockLogStore {
252 entries: all_entries.clone(),
253 };
254
255 let expected_region_id = RegionId::new(1024, 3);
256 let reader = RegionRawEntryReader::new(
257 LogStoreRawEntryReader::new(Arc::new(store)),
258 expected_region_id,
259 );
260 let entries = reader
261 .read(&provider, 0)
262 .unwrap()
263 .try_collect::<Vec<_>>()
264 .await
265 .unwrap();
266 assert_eq!(
267 all_entries
268 .into_iter()
269 .filter(|entry| entry.region_id() == expected_region_id)
270 .collect::<Vec<_>>(),
271 entries
272 );
273 }
274}