common_meta/
range_stream.rs1use async_stream::try_stream;
16use common_telemetry::debug;
17use futures::Stream;
18use snafu::ensure;
19
20use crate::error::{self, Result};
21use crate::kv_backend::KvBackendRef;
22use crate::rpc::store::{RangeRequest, RangeResponse};
23use crate::rpc::KeyValue;
24use crate::util::get_next_prefix_key;
25
26pub type KeyValueDecoderFn<T> = dyn Fn(KeyValue) -> Result<T> + Send + Sync;
27
28pub const DEFAULT_PAGE_SIZE: usize = 1536;
38
39struct PaginationStreamFactory {
40 kv: KvBackendRef,
41 pub key: Vec<u8>,
44 pub range_end: Vec<u8>,
51
52 pub keys_only: bool,
54
55 pub adaptive_page_size: usize,
57
58 pub more: bool,
59}
60
61impl PaginationStreamFactory {
62 fn new(
63 kv: &KvBackendRef,
64 key: Vec<u8>,
65 range_end: Vec<u8>,
66 page_size: usize,
67 keys_only: bool,
68 more: bool,
69 ) -> Self {
70 Self {
71 kv: kv.clone(),
72 key,
73 range_end,
74 keys_only,
75 more,
76 adaptive_page_size: if page_size == 0 {
77 DEFAULT_ADAPTIVE_PAGE_SIZE
78 } else {
79 page_size
80 },
81 }
82 }
83}
84
85const DEFAULT_ADAPTIVE_PAGE_SIZE: usize = 1024;
86
87impl PaginationStreamFactory {
88 fn try_reduce_adaptive_page_size(&mut self) -> Result<()> {
89 self.adaptive_page_size /= 2;
90
91 ensure!(
92 self.adaptive_page_size != 0,
93 error::UnexpectedSnafu {
94 err_msg: "Exceeded maximum number of adaptive range retries"
95 }
96 );
97
98 Ok(())
99 }
100
101 #[async_recursion::async_recursion]
104 async fn adaptive_range(&mut self, req: RangeRequest) -> Result<RangeResponse> {
105 match self.kv.range(req.clone()).await {
106 Ok(resp) => Ok(resp),
107 Err(err) => {
108 if err.is_exceeded_size_limit() {
109 self.try_reduce_adaptive_page_size()?;
110 debug!("Reset page_size to {}", self.adaptive_page_size);
111
112 self.adaptive_range(req.with_limit(self.adaptive_page_size as i64))
113 .await
114 } else {
115 Err(err)
116 }
117 }
118 }
119 }
120
121 async fn read_next(&mut self) -> Result<Option<RangeResponse>> {
122 if self.more {
123 let resp = self
124 .adaptive_range(RangeRequest {
125 key: self.key.clone(),
126 range_end: self.range_end.clone(),
127 limit: self.adaptive_page_size as i64,
128 keys_only: self.keys_only,
129 })
130 .await?;
131
132 let key = resp
133 .kvs
134 .last()
135 .map(|kv| kv.key.as_slice())
136 .unwrap_or_default();
137
138 let next_key = get_next_prefix_key(key);
139 self.key = next_key;
140 self.more = resp.more;
141 Ok(Some(resp))
142 } else {
143 Ok(None)
144 }
145 }
146}
147
148pub struct PaginationStream<T> {
149 decoder_fn: fn(KeyValue) -> Result<T>,
150 factory: PaginationStreamFactory,
151}
152
153impl<T> PaginationStream<T> {
154 pub fn new(
156 kv: KvBackendRef,
157 req: RangeRequest,
158 page_size: usize,
159 decoder_fn: fn(KeyValue) -> Result<T>,
160 ) -> Self {
161 Self {
162 decoder_fn,
163 factory: PaginationStreamFactory::new(
164 &kv,
165 req.key,
166 req.range_end,
167 page_size,
168 req.keys_only,
169 true,
170 ),
171 }
172 }
173}
174
175impl<T> PaginationStream<T> {
176 pub fn into_stream(mut self) -> impl Stream<Item = Result<T>> {
177 try_stream!({
178 while let Some(resp) = self.factory.read_next().await? {
179 for kv in resp.kvs {
180 yield (self.decoder_fn)(kv)?
181 }
182 }
183 })
184 }
185}
186
187#[cfg(test)]
188mod tests {
189
190 use std::assert_matches::assert_matches;
191 use std::collections::BTreeMap;
192 use std::sync::Arc;
193
194 use futures::TryStreamExt;
195
196 use super::*;
197 use crate::error::{Error, Result};
198 use crate::kv_backend::memory::MemoryKvBackend;
199 use crate::kv_backend::KvBackend;
200 use crate::rpc::store::PutRequest;
201
202 fn decoder(kv: KeyValue) -> Result<(Vec<u8>, Vec<u8>)> {
203 Ok((kv.key.clone(), kv.value))
204 }
205
206 #[test]
207 fn test_try_reduce_page_size() {
208 let kv_backend = Arc::new(MemoryKvBackend::<Error>::new()) as _;
209
210 let mut factory =
211 PaginationStreamFactory::new(&kv_backend, vec![], vec![], 2, false, false);
212
213 factory.try_reduce_adaptive_page_size().unwrap();
215
216 assert_matches!(
218 factory.try_reduce_adaptive_page_size().unwrap_err(),
219 error::Error::Unexpected { .. }
220 );
221
222 let mut factory =
223 PaginationStreamFactory::new(&kv_backend, vec![], vec![], 1024, false, false);
224
225 factory.try_reduce_adaptive_page_size().unwrap();
226
227 assert_eq!(factory.adaptive_page_size, 512);
228
229 factory.try_reduce_adaptive_page_size().unwrap();
230
231 assert_eq!(factory.adaptive_page_size, 256);
232
233 let mut factory =
234 PaginationStreamFactory::new(&kv_backend, vec![], vec![], 0, false, false);
235
236 factory.try_reduce_adaptive_page_size().unwrap();
237
238 assert_eq!(factory.adaptive_page_size, DEFAULT_ADAPTIVE_PAGE_SIZE / 2);
239 }
240
241 #[tokio::test]
242 async fn test_range_empty() {
243 let kv_backend = Arc::new(MemoryKvBackend::<Error>::new());
244
245 let stream = PaginationStream::new(
246 kv_backend.clone(),
247 RangeRequest {
248 key: b"a".to_vec(),
249 ..Default::default()
250 },
251 DEFAULT_PAGE_SIZE,
252 decoder,
253 )
254 .into_stream();
255 let kv = stream.try_collect::<Vec<_>>().await.unwrap();
256
257 assert!(kv.is_empty());
258 }
259
260 #[tokio::test]
261 async fn test_range() {
262 let kv_backend = Arc::new(MemoryKvBackend::<Error>::new());
263 let total = 26;
264
265 let mut expected = BTreeMap::<Vec<u8>, ()>::new();
266 for i in 0..total {
267 let key = vec![97 + i];
268
269 assert!(kv_backend
270 .put(PutRequest {
271 key: key.clone(),
272 value: key.clone(),
273 ..Default::default()
274 })
275 .await
276 .is_ok());
277
278 expected.insert(key, ());
279 }
280
281 let key = b"a".to_vec();
282 let range_end = b"f".to_vec();
283
284 let stream = PaginationStream::new(
285 kv_backend.clone(),
286 RangeRequest {
287 key,
288 range_end,
289 ..Default::default()
290 },
291 2,
292 decoder,
293 );
294 let kv = stream
295 .into_stream()
296 .try_collect::<Vec<_>>()
297 .await
298 .unwrap()
299 .into_iter()
300 .map(|kv| kv.0)
301 .collect::<Vec<_>>();
302
303 assert_eq!(vec![vec![97], vec![98], vec![99], vec![100], vec![101]], kv);
304 }
305}