common_meta/
range_stream.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use async_stream::try_stream;
16use common_telemetry::debug;
17use futures::Stream;
18use snafu::ensure;
19
20use crate::error::{self, Result};
21use crate::kv_backend::KvBackendRef;
22use crate::rpc::store::{RangeRequest, RangeResponse};
23use crate::rpc::KeyValue;
24use crate::util::get_next_prefix_key;
25
26pub type KeyValueDecoderFn<T> = dyn Fn(KeyValue) -> Result<T> + Send + Sync;
27
28/// The Range Request's default page size.
29///
30/// It dependents on upstream KvStore server side grpc message size limitation.
31/// (e.g., etcd has default grpc message size limitation is 4MiB)
32///
33/// Generally, almost all metadata is smaller than is 2700 Byte.
34/// Therefore, We can set the [DEFAULT_PAGE_SIZE] to 1536 statically.
35///
36/// TODO(weny): Considers updating the default page size dynamically.
37pub const DEFAULT_PAGE_SIZE: usize = 1536;
38
39struct PaginationStreamFactory {
40    kv: KvBackendRef,
41    /// key is the first key for the range, If range_end is not given, the
42    /// request only looks up key.
43    pub key: Vec<u8>,
44    /// range_end is the upper bound on the requested range [key, range_end).
45    /// If range_end is '\0', the range is all keys >= key.
46    /// If range_end is key plus one (e.g., "aa"+1 == "ab", "a\xff"+1 == "b"),
47    /// then the range request gets all keys prefixed with key.
48    /// If both key and range_end are '\0', then the range request returns all
49    /// keys.
50    pub range_end: Vec<u8>,
51
52    /// keys_only when set returns only the keys and not the values.
53    pub keys_only: bool,
54
55    /// It reduces the page size if the response size exceeds the limit.
56    pub adaptive_page_size: usize,
57
58    pub more: bool,
59}
60
61impl PaginationStreamFactory {
62    fn new(
63        kv: &KvBackendRef,
64        key: Vec<u8>,
65        range_end: Vec<u8>,
66        page_size: usize,
67        keys_only: bool,
68        more: bool,
69    ) -> Self {
70        Self {
71            kv: kv.clone(),
72            key,
73            range_end,
74            keys_only,
75            more,
76            adaptive_page_size: if page_size == 0 {
77                DEFAULT_ADAPTIVE_PAGE_SIZE
78            } else {
79                page_size
80            },
81        }
82    }
83}
84
85const DEFAULT_ADAPTIVE_PAGE_SIZE: usize = 1024;
86
87impl PaginationStreamFactory {
88    fn try_reduce_adaptive_page_size(&mut self) -> Result<()> {
89        self.adaptive_page_size /= 2;
90
91        ensure!(
92            self.adaptive_page_size != 0,
93            error::UnexpectedSnafu {
94                err_msg: "Exceeded maximum number of adaptive range retries"
95            }
96        );
97
98        Ok(())
99    }
100
101    /// Decreases the `page size` if the response message size exceeds the limitation.
102    /// TODO(weny): Considers to add an E2e test.
103    #[async_recursion::async_recursion]
104    async fn adaptive_range(&mut self, req: RangeRequest) -> Result<RangeResponse> {
105        match self.kv.range(req.clone()).await {
106            Ok(resp) => Ok(resp),
107            Err(err) => {
108                if err.is_exceeded_size_limit() {
109                    self.try_reduce_adaptive_page_size()?;
110                    debug!("Reset page_size to {}", self.adaptive_page_size);
111
112                    self.adaptive_range(req.with_limit(self.adaptive_page_size as i64))
113                        .await
114                } else {
115                    Err(err)
116                }
117            }
118        }
119    }
120
121    async fn read_next(&mut self) -> Result<Option<RangeResponse>> {
122        if self.more {
123            let resp = self
124                .adaptive_range(RangeRequest {
125                    key: self.key.clone(),
126                    range_end: self.range_end.clone(),
127                    limit: self.adaptive_page_size as i64,
128                    keys_only: self.keys_only,
129                })
130                .await?;
131
132            let key = resp
133                .kvs
134                .last()
135                .map(|kv| kv.key.as_slice())
136                .unwrap_or_default();
137
138            let next_key = get_next_prefix_key(key);
139            self.key = next_key;
140            self.more = resp.more;
141            Ok(Some(resp))
142        } else {
143            Ok(None)
144        }
145    }
146}
147
148pub struct PaginationStream<T> {
149    decoder_fn: fn(KeyValue) -> Result<T>,
150    factory: PaginationStreamFactory,
151}
152
153impl<T> PaginationStream<T> {
154    /// Returns a new [PaginationStream].
155    pub fn new(
156        kv: KvBackendRef,
157        req: RangeRequest,
158        page_size: usize,
159        decoder_fn: fn(KeyValue) -> Result<T>,
160    ) -> Self {
161        Self {
162            decoder_fn,
163            factory: PaginationStreamFactory::new(
164                &kv,
165                req.key,
166                req.range_end,
167                page_size,
168                req.keys_only,
169                true,
170            ),
171        }
172    }
173}
174
175impl<T> PaginationStream<T> {
176    pub fn into_stream(mut self) -> impl Stream<Item = Result<T>> {
177        try_stream!({
178            while let Some(resp) = self.factory.read_next().await? {
179                for kv in resp.kvs {
180                    yield (self.decoder_fn)(kv)?
181                }
182            }
183        })
184    }
185}
186
187#[cfg(test)]
188mod tests {
189
190    use std::assert_matches::assert_matches;
191    use std::collections::BTreeMap;
192    use std::sync::Arc;
193
194    use futures::TryStreamExt;
195
196    use super::*;
197    use crate::error::{Error, Result};
198    use crate::kv_backend::memory::MemoryKvBackend;
199    use crate::kv_backend::KvBackend;
200    use crate::rpc::store::PutRequest;
201
202    fn decoder(kv: KeyValue) -> Result<(Vec<u8>, Vec<u8>)> {
203        Ok((kv.key.clone(), kv.value))
204    }
205
206    #[test]
207    fn test_try_reduce_page_size() {
208        let kv_backend = Arc::new(MemoryKvBackend::<Error>::new()) as _;
209
210        let mut factory =
211            PaginationStreamFactory::new(&kv_backend, vec![], vec![], 2, false, false);
212
213        // new adaptive page size: 1
214        factory.try_reduce_adaptive_page_size().unwrap();
215
216        // new adaptive page size: 0
217        assert_matches!(
218            factory.try_reduce_adaptive_page_size().unwrap_err(),
219            error::Error::Unexpected { .. }
220        );
221
222        let mut factory =
223            PaginationStreamFactory::new(&kv_backend, vec![], vec![], 1024, false, false);
224
225        factory.try_reduce_adaptive_page_size().unwrap();
226
227        assert_eq!(factory.adaptive_page_size, 512);
228
229        factory.try_reduce_adaptive_page_size().unwrap();
230
231        assert_eq!(factory.adaptive_page_size, 256);
232
233        let mut factory =
234            PaginationStreamFactory::new(&kv_backend, vec![], vec![], 0, false, false);
235
236        factory.try_reduce_adaptive_page_size().unwrap();
237
238        assert_eq!(factory.adaptive_page_size, DEFAULT_ADAPTIVE_PAGE_SIZE / 2);
239    }
240
241    #[tokio::test]
242    async fn test_range_empty() {
243        let kv_backend = Arc::new(MemoryKvBackend::<Error>::new());
244
245        let stream = PaginationStream::new(
246            kv_backend.clone(),
247            RangeRequest {
248                key: b"a".to_vec(),
249                ..Default::default()
250            },
251            DEFAULT_PAGE_SIZE,
252            decoder,
253        )
254        .into_stream();
255        let kv = stream.try_collect::<Vec<_>>().await.unwrap();
256
257        assert!(kv.is_empty());
258    }
259
260    #[tokio::test]
261    async fn test_range() {
262        let kv_backend = Arc::new(MemoryKvBackend::<Error>::new());
263        let total = 26;
264
265        let mut expected = BTreeMap::<Vec<u8>, ()>::new();
266        for i in 0..total {
267            let key = vec![97 + i];
268
269            assert!(kv_backend
270                .put(PutRequest {
271                    key: key.clone(),
272                    value: key.clone(),
273                    ..Default::default()
274                })
275                .await
276                .is_ok());
277
278            expected.insert(key, ());
279        }
280
281        let key = b"a".to_vec();
282        let range_end = b"f".to_vec();
283
284        let stream = PaginationStream::new(
285            kv_backend.clone(),
286            RangeRequest {
287                key,
288                range_end,
289                ..Default::default()
290            },
291            2,
292            decoder,
293        );
294        let kv = stream
295            .into_stream()
296            .try_collect::<Vec<_>>()
297            .await
298            .unwrap()
299            .into_iter()
300            .map(|kv| kv.0)
301            .collect::<Vec<_>>();
302
303        assert_eq!(vec![vec![97], vec![98], vec![99], vec![100], vec![101]], kv);
304    }
305}