Skip to main content

mito2/sst/index/
store.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::io;
16use std::ops::Range;
17use std::pin::Pin;
18use std::task::{Context, Poll};
19
20use async_trait::async_trait;
21use bytes::{BufMut, Bytes};
22use common_base::range_read::{Metadata, RangeReader, SizeAwareRangeReader};
23use futures::{AsyncRead, AsyncSeek, AsyncWrite};
24use object_store::ObjectStore;
25use pin_project::pin_project;
26use prometheus::IntCounter;
27use snafu::ResultExt;
28
29use crate::error::{OpenDalSnafu, Result};
30
31/// A wrapper around [`ObjectStore`] that adds instrumentation for monitoring
32/// metrics such as bytes read, bytes written, and the number of seek operations.
33///
34/// TODO: Consider refactor InstrumentedStore to use async in trait instead of AsyncRead.
35#[derive(Clone)]
36pub(crate) struct InstrumentedStore {
37    /// The underlying object store.
38    object_store: ObjectStore,
39    /// The size of the write buffer.
40    write_buffer_size: Option<usize>,
41}
42
43impl InstrumentedStore {
44    /// Create a new `InstrumentedStore`.
45    pub fn new(object_store: ObjectStore) -> Self {
46        Self {
47            object_store,
48            write_buffer_size: None,
49        }
50    }
51
52    pub fn store(&self) -> &ObjectStore {
53        &self.object_store
54    }
55
56    /// Set the size of the write buffer.
57    pub fn with_write_buffer_size(mut self, write_buffer_size: Option<usize>) -> Self {
58        self.write_buffer_size = write_buffer_size.filter(|&size| size > 0);
59        self
60    }
61
62    /// Returns an [`InstrumentedRangeReader`] for the given path.
63    /// Metrics like the number of bytes read are recorded using the provided `IntCounter`.
64    pub async fn range_reader<'a>(
65        &self,
66        path: &str,
67        read_byte_count: &'a IntCounter,
68        read_count: &'a IntCounter,
69    ) -> Result<InstrumentedRangeReader<'a>> {
70        Ok(InstrumentedRangeReader {
71            store: self.object_store.clone(),
72            path: path.to_string(),
73            read_byte_count,
74            read_count,
75            file_size_hint: None,
76        })
77    }
78
79    /// Returns an [`InstrumentedAsyncRead`] for the given path.
80    /// Metrics like the number of bytes read, read and seek operations
81    /// are recorded using the provided `IntCounter`s.
82    pub async fn reader<'a>(
83        &self,
84        path: &str,
85        read_byte_count: &'a IntCounter,
86        read_count: &'a IntCounter,
87        seek_count: &'a IntCounter,
88    ) -> Result<InstrumentedAsyncRead<'a, object_store::FuturesAsyncReader>> {
89        let meta = self.object_store.stat(path).await.context(OpenDalSnafu)?;
90        let reader = self
91            .object_store
92            .reader(path)
93            .await
94            .context(OpenDalSnafu)?
95            .into_futures_async_read(0..meta.content_length())
96            .await
97            .context(OpenDalSnafu)?;
98        Ok(InstrumentedAsyncRead::new(
99            reader,
100            read_byte_count,
101            read_count,
102            seek_count,
103        ))
104    }
105
106    /// Returns an [`InstrumentedAsyncWrite`] for the given path.
107    /// Metrics like the number of bytes written, write and flush operations
108    /// are recorded using the provided `IntCounter`s.
109    pub async fn writer<'a>(
110        &self,
111        path: &str,
112        write_byte_count: &'a IntCounter,
113        write_count: &'a IntCounter,
114        flush_count: &'a IntCounter,
115    ) -> Result<InstrumentedAsyncWrite<'a, object_store::FuturesAsyncWriter>> {
116        let writer = match self.write_buffer_size {
117            Some(size) => self
118                .object_store
119                .writer_with(path)
120                .chunk(size)
121                .await
122                .context(OpenDalSnafu)?
123                .into_futures_async_write(),
124            None => self
125                .object_store
126                .writer(path)
127                .await
128                .context(OpenDalSnafu)?
129                .into_futures_async_write(),
130        };
131        Ok(InstrumentedAsyncWrite::new(
132            writer,
133            write_byte_count,
134            write_count,
135            flush_count,
136        ))
137    }
138
139    /// Proxies to [`ObjectStore::list`].
140    pub async fn list(&self, path: &str) -> Result<Vec<object_store::Entry>> {
141        let list = self.object_store.list(path).await.context(OpenDalSnafu)?;
142        Ok(list)
143    }
144
145    /// Recursively deletes all objects under the given path.
146    pub async fn remove_all(&self, path: &str) -> Result<()> {
147        self.object_store
148            .delete_with(path)
149            .recursive(true)
150            .await
151            .context(OpenDalSnafu)
152    }
153}
154
155/// A wrapper around [`AsyncRead`] that adds instrumentation for monitoring
156#[pin_project]
157pub(crate) struct InstrumentedAsyncRead<'a, R> {
158    #[pin]
159    inner: R,
160    read_byte_count: CounterGuard<'a>,
161    read_count: CounterGuard<'a>,
162    seek_count: CounterGuard<'a>,
163}
164
165impl<'a, R> InstrumentedAsyncRead<'a, R> {
166    /// Create a new `InstrumentedAsyncRead`.
167    fn new(
168        inner: R,
169        read_byte_count: &'a IntCounter,
170        read_count: &'a IntCounter,
171        seek_count: &'a IntCounter,
172    ) -> Self {
173        Self {
174            inner,
175            read_byte_count: CounterGuard::new(read_byte_count),
176            read_count: CounterGuard::new(read_count),
177            seek_count: CounterGuard::new(seek_count),
178        }
179    }
180}
181
182impl<R: AsyncRead + Unpin + Send> AsyncRead for InstrumentedAsyncRead<'_, R> {
183    fn poll_read(
184        mut self: Pin<&mut Self>,
185        cx: &mut Context<'_>,
186        buf: &mut [u8],
187    ) -> Poll<io::Result<usize>> {
188        let poll = self.as_mut().project().inner.poll_read(cx, buf);
189        if let Poll::Ready(Ok(n)) = &poll {
190            self.read_count.inc_by(1);
191            self.read_byte_count.inc_by(*n);
192        }
193        poll
194    }
195}
196
197impl<R: AsyncSeek + Unpin + Send> AsyncSeek for InstrumentedAsyncRead<'_, R> {
198    fn poll_seek(
199        mut self: Pin<&mut Self>,
200        cx: &mut Context<'_>,
201        pos: io::SeekFrom,
202    ) -> Poll<io::Result<u64>> {
203        let poll = self.as_mut().project().inner.poll_seek(cx, pos);
204        if let Poll::Ready(Ok(_)) = &poll {
205            self.seek_count.inc_by(1);
206        }
207        poll
208    }
209}
210
211/// A wrapper around [`AsyncWrite`] that adds instrumentation for monitoring
212#[pin_project]
213pub(crate) struct InstrumentedAsyncWrite<'a, W> {
214    #[pin]
215    inner: W,
216    write_byte_count: CounterGuard<'a>,
217    write_count: CounterGuard<'a>,
218    flush_count: CounterGuard<'a>,
219}
220
221impl<'a, W> InstrumentedAsyncWrite<'a, W> {
222    /// Create a new `InstrumentedAsyncWrite`.
223    fn new(
224        inner: W,
225        write_byte_count: &'a IntCounter,
226        write_count: &'a IntCounter,
227        flush_count: &'a IntCounter,
228    ) -> Self {
229        Self {
230            inner,
231            write_byte_count: CounterGuard::new(write_byte_count),
232            write_count: CounterGuard::new(write_count),
233            flush_count: CounterGuard::new(flush_count),
234        }
235    }
236}
237
238impl<W: AsyncWrite + Unpin + Send> AsyncWrite for InstrumentedAsyncWrite<'_, W> {
239    fn poll_write(
240        mut self: Pin<&mut Self>,
241        cx: &mut Context<'_>,
242        buf: &[u8],
243    ) -> Poll<io::Result<usize>> {
244        let poll = self.as_mut().project().inner.poll_write(cx, buf);
245        if let Poll::Ready(Ok(n)) = &poll {
246            self.write_count.inc_by(1);
247            self.write_byte_count.inc_by(*n);
248        }
249        poll
250    }
251
252    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
253        let poll = self.as_mut().project().inner.poll_flush(cx);
254        if let Poll::Ready(Ok(())) = &poll {
255            self.flush_count.inc_by(1);
256        }
257        poll
258    }
259
260    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
261        self.project().inner.poll_close(cx)
262    }
263}
264
265/// Implements `RangeReader` for `ObjectStore` and record metrics.
266pub(crate) struct InstrumentedRangeReader<'a> {
267    store: ObjectStore,
268    path: String,
269    read_byte_count: &'a IntCounter,
270    read_count: &'a IntCounter,
271    file_size_hint: Option<u64>,
272}
273
274impl SizeAwareRangeReader for InstrumentedRangeReader<'_> {
275    fn with_file_size_hint(&mut self, file_size_hint: u64) {
276        self.file_size_hint = Some(file_size_hint);
277    }
278}
279
280#[async_trait]
281impl RangeReader for InstrumentedRangeReader<'_> {
282    async fn metadata(&self) -> io::Result<Metadata> {
283        match self.file_size_hint {
284            Some(file_size_hint) => Ok(Metadata {
285                content_length: file_size_hint,
286            }),
287            None => {
288                let stat = self.store.stat(&self.path).await?;
289                Ok(Metadata {
290                    content_length: stat.content_length(),
291                })
292            }
293        }
294    }
295
296    async fn read(&self, range: Range<u64>) -> io::Result<Bytes> {
297        let buf = self.store.reader(&self.path).await?.read(range).await?;
298        self.read_byte_count.inc_by(buf.len() as _);
299        self.read_count.inc_by(1);
300        Ok(buf.to_bytes())
301    }
302
303    async fn read_into(&self, range: Range<u64>, buf: &mut (impl BufMut + Send)) -> io::Result<()> {
304        let reader = self.store.reader(&self.path).await?;
305        let size = reader.read_into(buf, range).await?;
306        self.read_byte_count.inc_by(size as _);
307        self.read_count.inc_by(1);
308        Ok(())
309    }
310
311    async fn read_vec(&self, ranges: &[Range<u64>]) -> io::Result<Vec<Bytes>> {
312        let bufs = self
313            .store
314            .reader(&self.path)
315            .await?
316            .fetch(ranges.to_owned())
317            .await?;
318        let total_size: usize = bufs.iter().map(|buf| buf.len()).sum();
319        self.read_byte_count.inc_by(total_size as _);
320        self.read_count.inc_by(1);
321        Ok(bufs.into_iter().map(|buf| buf.to_bytes()).collect())
322    }
323}
324
325/// A guard that increments a counter when dropped.
326struct CounterGuard<'a> {
327    count: usize,
328    counter: &'a IntCounter,
329}
330
331impl<'a> CounterGuard<'a> {
332    /// Create a new `CounterGuard`.
333    fn new(counter: &'a IntCounter) -> Self {
334        Self { count: 0, counter }
335    }
336
337    /// Increment the counter by `n`.
338    fn inc_by(&mut self, n: usize) {
339        self.count += n;
340    }
341}
342
343impl Drop for CounterGuard<'_> {
344    fn drop(&mut self) {
345        if self.count > 0 {
346            self.counter.inc_by(self.count as _);
347        }
348    }
349}
350
351#[cfg(test)]
352mod tests {
353    use futures::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
354    use object_store::services::Memory;
355
356    use super::*;
357
358    #[tokio::test]
359    async fn test_instrumented_store_read_write() {
360        let instrumented_store =
361            InstrumentedStore::new(ObjectStore::new(Memory::default()).unwrap().finish());
362
363        let read_byte_count = IntCounter::new("read_byte_count", "read_byte_count").unwrap();
364        let read_count = IntCounter::new("read_count", "read_count").unwrap();
365        let seek_count = IntCounter::new("seek_count", "seek_count").unwrap();
366        let write_byte_count = IntCounter::new("write_byte_count", "write_byte_count").unwrap();
367        let write_count = IntCounter::new("write_count", "write_count").unwrap();
368        let flush_count = IntCounter::new("flush_count", "flush_count").unwrap();
369
370        let mut writer = instrumented_store
371            .writer("my_file", &write_byte_count, &write_count, &flush_count)
372            .await
373            .unwrap();
374        writer.write_all(b"hello").await.unwrap();
375        writer.flush().await.unwrap();
376        writer.close().await.unwrap();
377        drop(writer);
378
379        let mut reader = instrumented_store
380            .reader("my_file", &read_byte_count, &read_count, &seek_count)
381            .await
382            .unwrap();
383        let mut buf = vec![0; 5];
384        reader.read_exact(&mut buf).await.unwrap();
385        reader.seek(io::SeekFrom::Start(0)).await.unwrap();
386        reader.read_exact(&mut buf).await.unwrap();
387        drop(reader);
388
389        assert_eq!(read_byte_count.get(), 10);
390        assert_eq!(read_count.get(), 2);
391        assert_eq!(seek_count.get(), 1);
392        assert_eq!(write_byte_count.get(), 5);
393        assert_eq!(write_count.get(), 1);
394        assert_eq!(flush_count.get(), 1);
395    }
396}