1use std::io;
16use std::ops::Range;
17use std::pin::Pin;
18use std::task::{Context, Poll};
19
20use async_trait::async_trait;
21use bytes::{BufMut, Bytes};
22use common_base::range_read::{Metadata, RangeReader, SizeAwareRangeReader};
23use futures::{AsyncRead, AsyncSeek, AsyncWrite};
24use object_store::ObjectStore;
25use pin_project::pin_project;
26use prometheus::IntCounter;
27use snafu::ResultExt;
28
29use crate::error::{OpenDalSnafu, Result};
30
31#[derive(Clone)]
36pub(crate) struct InstrumentedStore {
37 object_store: ObjectStore,
39 write_buffer_size: Option<usize>,
41}
42
43impl InstrumentedStore {
44 pub fn new(object_store: ObjectStore) -> Self {
46 Self {
47 object_store,
48 write_buffer_size: None,
49 }
50 }
51
52 pub fn store(&self) -> &ObjectStore {
53 &self.object_store
54 }
55
56 pub fn with_write_buffer_size(mut self, write_buffer_size: Option<usize>) -> Self {
58 self.write_buffer_size = write_buffer_size.filter(|&size| size > 0);
59 self
60 }
61
62 pub async fn range_reader<'a>(
65 &self,
66 path: &str,
67 read_byte_count: &'a IntCounter,
68 read_count: &'a IntCounter,
69 ) -> Result<InstrumentedRangeReader<'a>> {
70 Ok(InstrumentedRangeReader {
71 store: self.object_store.clone(),
72 path: path.to_string(),
73 read_byte_count,
74 read_count,
75 file_size_hint: None,
76 })
77 }
78
79 pub async fn reader<'a>(
83 &self,
84 path: &str,
85 read_byte_count: &'a IntCounter,
86 read_count: &'a IntCounter,
87 seek_count: &'a IntCounter,
88 ) -> Result<InstrumentedAsyncRead<'a, object_store::FuturesAsyncReader>> {
89 let meta = self.object_store.stat(path).await.context(OpenDalSnafu)?;
90 let reader = self
91 .object_store
92 .reader(path)
93 .await
94 .context(OpenDalSnafu)?
95 .into_futures_async_read(0..meta.content_length())
96 .await
97 .context(OpenDalSnafu)?;
98 Ok(InstrumentedAsyncRead::new(
99 reader,
100 read_byte_count,
101 read_count,
102 seek_count,
103 ))
104 }
105
106 pub async fn writer<'a>(
110 &self,
111 path: &str,
112 write_byte_count: &'a IntCounter,
113 write_count: &'a IntCounter,
114 flush_count: &'a IntCounter,
115 ) -> Result<InstrumentedAsyncWrite<'a, object_store::FuturesAsyncWriter>> {
116 let writer = match self.write_buffer_size {
117 Some(size) => self
118 .object_store
119 .writer_with(path)
120 .chunk(size)
121 .await
122 .context(OpenDalSnafu)?
123 .into_futures_async_write(),
124 None => self
125 .object_store
126 .writer(path)
127 .await
128 .context(OpenDalSnafu)?
129 .into_futures_async_write(),
130 };
131 Ok(InstrumentedAsyncWrite::new(
132 writer,
133 write_byte_count,
134 write_count,
135 flush_count,
136 ))
137 }
138
139 pub async fn list(&self, path: &str) -> Result<Vec<object_store::Entry>> {
141 let list = self.object_store.list(path).await.context(OpenDalSnafu)?;
142 Ok(list)
143 }
144
145 pub async fn remove_all(&self, path: &str) -> Result<()> {
147 self.object_store
148 .delete_with(path)
149 .recursive(true)
150 .await
151 .context(OpenDalSnafu)
152 }
153}
154
155#[pin_project]
157pub(crate) struct InstrumentedAsyncRead<'a, R> {
158 #[pin]
159 inner: R,
160 read_byte_count: CounterGuard<'a>,
161 read_count: CounterGuard<'a>,
162 seek_count: CounterGuard<'a>,
163}
164
165impl<'a, R> InstrumentedAsyncRead<'a, R> {
166 fn new(
168 inner: R,
169 read_byte_count: &'a IntCounter,
170 read_count: &'a IntCounter,
171 seek_count: &'a IntCounter,
172 ) -> Self {
173 Self {
174 inner,
175 read_byte_count: CounterGuard::new(read_byte_count),
176 read_count: CounterGuard::new(read_count),
177 seek_count: CounterGuard::new(seek_count),
178 }
179 }
180}
181
182impl<R: AsyncRead + Unpin + Send> AsyncRead for InstrumentedAsyncRead<'_, R> {
183 fn poll_read(
184 mut self: Pin<&mut Self>,
185 cx: &mut Context<'_>,
186 buf: &mut [u8],
187 ) -> Poll<io::Result<usize>> {
188 let poll = self.as_mut().project().inner.poll_read(cx, buf);
189 if let Poll::Ready(Ok(n)) = &poll {
190 self.read_count.inc_by(1);
191 self.read_byte_count.inc_by(*n);
192 }
193 poll
194 }
195}
196
197impl<R: AsyncSeek + Unpin + Send> AsyncSeek for InstrumentedAsyncRead<'_, R> {
198 fn poll_seek(
199 mut self: Pin<&mut Self>,
200 cx: &mut Context<'_>,
201 pos: io::SeekFrom,
202 ) -> Poll<io::Result<u64>> {
203 let poll = self.as_mut().project().inner.poll_seek(cx, pos);
204 if let Poll::Ready(Ok(_)) = &poll {
205 self.seek_count.inc_by(1);
206 }
207 poll
208 }
209}
210
211#[pin_project]
213pub(crate) struct InstrumentedAsyncWrite<'a, W> {
214 #[pin]
215 inner: W,
216 write_byte_count: CounterGuard<'a>,
217 write_count: CounterGuard<'a>,
218 flush_count: CounterGuard<'a>,
219}
220
221impl<'a, W> InstrumentedAsyncWrite<'a, W> {
222 fn new(
224 inner: W,
225 write_byte_count: &'a IntCounter,
226 write_count: &'a IntCounter,
227 flush_count: &'a IntCounter,
228 ) -> Self {
229 Self {
230 inner,
231 write_byte_count: CounterGuard::new(write_byte_count),
232 write_count: CounterGuard::new(write_count),
233 flush_count: CounterGuard::new(flush_count),
234 }
235 }
236}
237
238impl<W: AsyncWrite + Unpin + Send> AsyncWrite for InstrumentedAsyncWrite<'_, W> {
239 fn poll_write(
240 mut self: Pin<&mut Self>,
241 cx: &mut Context<'_>,
242 buf: &[u8],
243 ) -> Poll<io::Result<usize>> {
244 let poll = self.as_mut().project().inner.poll_write(cx, buf);
245 if let Poll::Ready(Ok(n)) = &poll {
246 self.write_count.inc_by(1);
247 self.write_byte_count.inc_by(*n);
248 }
249 poll
250 }
251
252 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
253 let poll = self.as_mut().project().inner.poll_flush(cx);
254 if let Poll::Ready(Ok(())) = &poll {
255 self.flush_count.inc_by(1);
256 }
257 poll
258 }
259
260 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
261 self.project().inner.poll_close(cx)
262 }
263}
264
265pub(crate) struct InstrumentedRangeReader<'a> {
267 store: ObjectStore,
268 path: String,
269 read_byte_count: &'a IntCounter,
270 read_count: &'a IntCounter,
271 file_size_hint: Option<u64>,
272}
273
274impl SizeAwareRangeReader for InstrumentedRangeReader<'_> {
275 fn with_file_size_hint(&mut self, file_size_hint: u64) {
276 self.file_size_hint = Some(file_size_hint);
277 }
278}
279
280#[async_trait]
281impl RangeReader for InstrumentedRangeReader<'_> {
282 async fn metadata(&self) -> io::Result<Metadata> {
283 match self.file_size_hint {
284 Some(file_size_hint) => Ok(Metadata {
285 content_length: file_size_hint,
286 }),
287 None => {
288 let stat = self.store.stat(&self.path).await?;
289 Ok(Metadata {
290 content_length: stat.content_length(),
291 })
292 }
293 }
294 }
295
296 async fn read(&self, range: Range<u64>) -> io::Result<Bytes> {
297 let buf = self.store.reader(&self.path).await?.read(range).await?;
298 self.read_byte_count.inc_by(buf.len() as _);
299 self.read_count.inc_by(1);
300 Ok(buf.to_bytes())
301 }
302
303 async fn read_into(&self, range: Range<u64>, buf: &mut (impl BufMut + Send)) -> io::Result<()> {
304 let reader = self.store.reader(&self.path).await?;
305 let size = reader.read_into(buf, range).await?;
306 self.read_byte_count.inc_by(size as _);
307 self.read_count.inc_by(1);
308 Ok(())
309 }
310
311 async fn read_vec(&self, ranges: &[Range<u64>]) -> io::Result<Vec<Bytes>> {
312 let bufs = self
313 .store
314 .reader(&self.path)
315 .await?
316 .fetch(ranges.to_owned())
317 .await?;
318 let total_size: usize = bufs.iter().map(|buf| buf.len()).sum();
319 self.read_byte_count.inc_by(total_size as _);
320 self.read_count.inc_by(1);
321 Ok(bufs.into_iter().map(|buf| buf.to_bytes()).collect())
322 }
323}
324
325struct CounterGuard<'a> {
327 count: usize,
328 counter: &'a IntCounter,
329}
330
331impl<'a> CounterGuard<'a> {
332 fn new(counter: &'a IntCounter) -> Self {
334 Self { count: 0, counter }
335 }
336
337 fn inc_by(&mut self, n: usize) {
339 self.count += n;
340 }
341}
342
343impl Drop for CounterGuard<'_> {
344 fn drop(&mut self) {
345 if self.count > 0 {
346 self.counter.inc_by(self.count as _);
347 }
348 }
349}
350
351#[cfg(test)]
352mod tests {
353 use futures::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
354 use object_store::services::Memory;
355
356 use super::*;
357
358 #[tokio::test]
359 async fn test_instrumented_store_read_write() {
360 let instrumented_store =
361 InstrumentedStore::new(ObjectStore::new(Memory::default()).unwrap().finish());
362
363 let read_byte_count = IntCounter::new("read_byte_count", "read_byte_count").unwrap();
364 let read_count = IntCounter::new("read_count", "read_count").unwrap();
365 let seek_count = IntCounter::new("seek_count", "seek_count").unwrap();
366 let write_byte_count = IntCounter::new("write_byte_count", "write_byte_count").unwrap();
367 let write_count = IntCounter::new("write_count", "write_count").unwrap();
368 let flush_count = IntCounter::new("flush_count", "flush_count").unwrap();
369
370 let mut writer = instrumented_store
371 .writer("my_file", &write_byte_count, &write_count, &flush_count)
372 .await
373 .unwrap();
374 writer.write_all(b"hello").await.unwrap();
375 writer.flush().await.unwrap();
376 writer.close().await.unwrap();
377 drop(writer);
378
379 let mut reader = instrumented_store
380 .reader("my_file", &read_byte_count, &read_count, &seek_count)
381 .await
382 .unwrap();
383 let mut buf = vec![0; 5];
384 reader.read_exact(&mut buf).await.unwrap();
385 reader.seek(io::SeekFrom::Start(0)).await.unwrap();
386 reader.read_exact(&mut buf).await.unwrap();
387 drop(reader);
388
389 assert_eq!(read_byte_count.get(), 10);
390 assert_eq!(read_count.get(), 2);
391 assert_eq!(seek_count.get(), 1);
392 assert_eq!(write_byte_count.get(), 5);
393 assert_eq!(write_count.get(), 1);
394 assert_eq!(flush_count.get(), 1);
395 }
396}