mito2/sst/index/
store.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::io;
16use std::ops::Range;
17use std::pin::Pin;
18use std::task::{Context, Poll};
19
20use async_trait::async_trait;
21use bytes::{BufMut, Bytes};
22use common_base::range_read::{Metadata, RangeReader, SizeAwareRangeReader};
23use futures::{AsyncRead, AsyncSeek, AsyncWrite};
24use object_store::ObjectStore;
25use pin_project::pin_project;
26use prometheus::IntCounter;
27use snafu::ResultExt;
28
29use crate::error::{OpenDalSnafu, Result};
30
31/// A wrapper around [`ObjectStore`] that adds instrumentation for monitoring
32/// metrics such as bytes read, bytes written, and the number of seek operations.
33///
34/// TODO: Consider refactor InstrumentedStore to use async in trait instead of AsyncRead.
35#[derive(Clone)]
36pub(crate) struct InstrumentedStore {
37    /// The underlying object store.
38    object_store: ObjectStore,
39    /// The size of the write buffer.
40    write_buffer_size: Option<usize>,
41}
42
43impl InstrumentedStore {
44    /// Create a new `InstrumentedStore`.
45    pub fn new(object_store: ObjectStore) -> Self {
46        Self {
47            object_store,
48            write_buffer_size: None,
49        }
50    }
51
52    /// Set the size of the write buffer.
53    pub fn with_write_buffer_size(mut self, write_buffer_size: Option<usize>) -> Self {
54        self.write_buffer_size = write_buffer_size.filter(|&size| size > 0);
55        self
56    }
57
58    /// Returns an [`InstrumentedRangeReader`] for the given path.
59    /// Metrics like the number of bytes read are recorded using the provided `IntCounter`.
60    pub async fn range_reader<'a>(
61        &self,
62        path: &str,
63        read_byte_count: &'a IntCounter,
64        read_count: &'a IntCounter,
65    ) -> Result<InstrumentedRangeReader<'a>> {
66        Ok(InstrumentedRangeReader {
67            store: self.object_store.clone(),
68            path: path.to_string(),
69            read_byte_count,
70            read_count,
71            file_size_hint: None,
72        })
73    }
74
75    /// Returns an [`InstrumentedAsyncRead`] for the given path.
76    /// Metrics like the number of bytes read, read and seek operations
77    /// are recorded using the provided `IntCounter`s.
78    pub async fn reader<'a>(
79        &self,
80        path: &str,
81        read_byte_count: &'a IntCounter,
82        read_count: &'a IntCounter,
83        seek_count: &'a IntCounter,
84    ) -> Result<InstrumentedAsyncRead<'a, object_store::FuturesAsyncReader>> {
85        let meta = self.object_store.stat(path).await.context(OpenDalSnafu)?;
86        let reader = self
87            .object_store
88            .reader(path)
89            .await
90            .context(OpenDalSnafu)?
91            .into_futures_async_read(0..meta.content_length())
92            .await
93            .context(OpenDalSnafu)?;
94        Ok(InstrumentedAsyncRead::new(
95            reader,
96            read_byte_count,
97            read_count,
98            seek_count,
99        ))
100    }
101
102    /// Returns an [`InstrumentedAsyncWrite`] for the given path.
103    /// Metrics like the number of bytes written, write and flush operations
104    /// are recorded using the provided `IntCounter`s.
105    pub async fn writer<'a>(
106        &self,
107        path: &str,
108        write_byte_count: &'a IntCounter,
109        write_count: &'a IntCounter,
110        flush_count: &'a IntCounter,
111    ) -> Result<InstrumentedAsyncWrite<'a, object_store::FuturesAsyncWriter>> {
112        let writer = match self.write_buffer_size {
113            Some(size) => self
114                .object_store
115                .writer_with(path)
116                .chunk(size)
117                .await
118                .context(OpenDalSnafu)?
119                .into_futures_async_write(),
120            None => self
121                .object_store
122                .writer(path)
123                .await
124                .context(OpenDalSnafu)?
125                .into_futures_async_write(),
126        };
127        Ok(InstrumentedAsyncWrite::new(
128            writer,
129            write_byte_count,
130            write_count,
131            flush_count,
132        ))
133    }
134
135    /// Proxies to [`ObjectStore::list`].
136    pub async fn list(&self, path: &str) -> Result<Vec<object_store::Entry>> {
137        let list = self.object_store.list(path).await.context(OpenDalSnafu)?;
138        Ok(list)
139    }
140
141    /// Proxies to [`ObjectStore::remove_all`].
142    pub async fn remove_all(&self, path: &str) -> Result<()> {
143        self.object_store
144            .remove_all(path)
145            .await
146            .context(OpenDalSnafu)
147    }
148}
149
150/// A wrapper around [`AsyncRead`] that adds instrumentation for monitoring
151#[pin_project]
152pub(crate) struct InstrumentedAsyncRead<'a, R> {
153    #[pin]
154    inner: R,
155    read_byte_count: CounterGuard<'a>,
156    read_count: CounterGuard<'a>,
157    seek_count: CounterGuard<'a>,
158}
159
160impl<'a, R> InstrumentedAsyncRead<'a, R> {
161    /// Create a new `InstrumentedAsyncRead`.
162    fn new(
163        inner: R,
164        read_byte_count: &'a IntCounter,
165        read_count: &'a IntCounter,
166        seek_count: &'a IntCounter,
167    ) -> Self {
168        Self {
169            inner,
170            read_byte_count: CounterGuard::new(read_byte_count),
171            read_count: CounterGuard::new(read_count),
172            seek_count: CounterGuard::new(seek_count),
173        }
174    }
175}
176
177impl<R: AsyncRead + Unpin + Send> AsyncRead for InstrumentedAsyncRead<'_, R> {
178    fn poll_read(
179        mut self: Pin<&mut Self>,
180        cx: &mut Context<'_>,
181        buf: &mut [u8],
182    ) -> Poll<io::Result<usize>> {
183        let poll = self.as_mut().project().inner.poll_read(cx, buf);
184        if let Poll::Ready(Ok(n)) = &poll {
185            self.read_count.inc_by(1);
186            self.read_byte_count.inc_by(*n);
187        }
188        poll
189    }
190}
191
192impl<R: AsyncSeek + Unpin + Send> AsyncSeek for InstrumentedAsyncRead<'_, R> {
193    fn poll_seek(
194        mut self: Pin<&mut Self>,
195        cx: &mut Context<'_>,
196        pos: io::SeekFrom,
197    ) -> Poll<io::Result<u64>> {
198        let poll = self.as_mut().project().inner.poll_seek(cx, pos);
199        if let Poll::Ready(Ok(_)) = &poll {
200            self.seek_count.inc_by(1);
201        }
202        poll
203    }
204}
205
206/// A wrapper around [`AsyncWrite`] that adds instrumentation for monitoring
207#[pin_project]
208pub(crate) struct InstrumentedAsyncWrite<'a, W> {
209    #[pin]
210    inner: W,
211    write_byte_count: CounterGuard<'a>,
212    write_count: CounterGuard<'a>,
213    flush_count: CounterGuard<'a>,
214}
215
216impl<'a, W> InstrumentedAsyncWrite<'a, W> {
217    /// Create a new `InstrumentedAsyncWrite`.
218    fn new(
219        inner: W,
220        write_byte_count: &'a IntCounter,
221        write_count: &'a IntCounter,
222        flush_count: &'a IntCounter,
223    ) -> Self {
224        Self {
225            inner,
226            write_byte_count: CounterGuard::new(write_byte_count),
227            write_count: CounterGuard::new(write_count),
228            flush_count: CounterGuard::new(flush_count),
229        }
230    }
231}
232
233impl<W: AsyncWrite + Unpin + Send> AsyncWrite for InstrumentedAsyncWrite<'_, W> {
234    fn poll_write(
235        mut self: Pin<&mut Self>,
236        cx: &mut Context<'_>,
237        buf: &[u8],
238    ) -> Poll<io::Result<usize>> {
239        let poll = self.as_mut().project().inner.poll_write(cx, buf);
240        if let Poll::Ready(Ok(n)) = &poll {
241            self.write_count.inc_by(1);
242            self.write_byte_count.inc_by(*n);
243        }
244        poll
245    }
246
247    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
248        let poll = self.as_mut().project().inner.poll_flush(cx);
249        if let Poll::Ready(Ok(())) = &poll {
250            self.flush_count.inc_by(1);
251        }
252        poll
253    }
254
255    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
256        self.project().inner.poll_close(cx)
257    }
258}
259
260/// Implements `RangeReader` for `ObjectStore` and record metrics.
261pub(crate) struct InstrumentedRangeReader<'a> {
262    store: ObjectStore,
263    path: String,
264    read_byte_count: &'a IntCounter,
265    read_count: &'a IntCounter,
266    file_size_hint: Option<u64>,
267}
268
269impl SizeAwareRangeReader for InstrumentedRangeReader<'_> {
270    fn with_file_size_hint(&mut self, file_size_hint: u64) {
271        self.file_size_hint = Some(file_size_hint);
272    }
273}
274
275#[async_trait]
276impl RangeReader for InstrumentedRangeReader<'_> {
277    async fn metadata(&self) -> io::Result<Metadata> {
278        match self.file_size_hint {
279            Some(file_size_hint) => Ok(Metadata {
280                content_length: file_size_hint,
281            }),
282            None => {
283                let stat = self.store.stat(&self.path).await?;
284                Ok(Metadata {
285                    content_length: stat.content_length(),
286                })
287            }
288        }
289    }
290
291    async fn read(&self, range: Range<u64>) -> io::Result<Bytes> {
292        let buf = self.store.reader(&self.path).await?.read(range).await?;
293        self.read_byte_count.inc_by(buf.len() as _);
294        self.read_count.inc_by(1);
295        Ok(buf.to_bytes())
296    }
297
298    async fn read_into(&self, range: Range<u64>, buf: &mut (impl BufMut + Send)) -> io::Result<()> {
299        let reader = self.store.reader(&self.path).await?;
300        let size = reader.read_into(buf, range).await?;
301        self.read_byte_count.inc_by(size as _);
302        self.read_count.inc_by(1);
303        Ok(())
304    }
305
306    async fn read_vec(&self, ranges: &[Range<u64>]) -> io::Result<Vec<Bytes>> {
307        let bufs = self
308            .store
309            .reader(&self.path)
310            .await?
311            .fetch(ranges.to_owned())
312            .await?;
313        let total_size: usize = bufs.iter().map(|buf| buf.len()).sum();
314        self.read_byte_count.inc_by(total_size as _);
315        self.read_count.inc_by(1);
316        Ok(bufs.into_iter().map(|buf| buf.to_bytes()).collect())
317    }
318}
319
320/// A guard that increments a counter when dropped.
321struct CounterGuard<'a> {
322    count: usize,
323    counter: &'a IntCounter,
324}
325
326impl<'a> CounterGuard<'a> {
327    /// Create a new `CounterGuard`.
328    fn new(counter: &'a IntCounter) -> Self {
329        Self { count: 0, counter }
330    }
331
332    /// Increment the counter by `n`.
333    fn inc_by(&mut self, n: usize) {
334        self.count += n;
335    }
336}
337
338impl Drop for CounterGuard<'_> {
339    fn drop(&mut self) {
340        if self.count > 0 {
341            self.counter.inc_by(self.count as _);
342        }
343    }
344}
345
346#[cfg(test)]
347mod tests {
348    use futures::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
349    use object_store::services::Memory;
350
351    use super::*;
352
353    #[tokio::test]
354    async fn test_instrumented_store_read_write() {
355        let instrumented_store =
356            InstrumentedStore::new(ObjectStore::new(Memory::default()).unwrap().finish());
357
358        let read_byte_count = IntCounter::new("read_byte_count", "read_byte_count").unwrap();
359        let read_count = IntCounter::new("read_count", "read_count").unwrap();
360        let seek_count = IntCounter::new("seek_count", "seek_count").unwrap();
361        let write_byte_count = IntCounter::new("write_byte_count", "write_byte_count").unwrap();
362        let write_count = IntCounter::new("write_count", "write_count").unwrap();
363        let flush_count = IntCounter::new("flush_count", "flush_count").unwrap();
364
365        let mut writer = instrumented_store
366            .writer("my_file", &write_byte_count, &write_count, &flush_count)
367            .await
368            .unwrap();
369        writer.write_all(b"hello").await.unwrap();
370        writer.flush().await.unwrap();
371        writer.close().await.unwrap();
372        drop(writer);
373
374        let mut reader = instrumented_store
375            .reader("my_file", &read_byte_count, &read_count, &seek_count)
376            .await
377            .unwrap();
378        let mut buf = vec![0; 5];
379        reader.read_exact(&mut buf).await.unwrap();
380        reader.seek(io::SeekFrom::Start(0)).await.unwrap();
381        reader.read_exact(&mut buf).await.unwrap();
382        drop(reader);
383
384        assert_eq!(read_byte_count.get(), 10);
385        assert_eq!(read_count.get(), 2);
386        assert_eq!(seek_count.get(), 1);
387        assert_eq!(write_byte_count.get(), 5);
388        assert_eq!(write_count.get(), 1);
389        assert_eq!(flush_count.get(), 1);
390    }
391}