1use std::io;
16use std::ops::Range;
17use std::pin::Pin;
18use std::task::{Context, Poll};
19
20use async_trait::async_trait;
21use bytes::{BufMut, Bytes};
22use common_base::range_read::{Metadata, RangeReader, SizeAwareRangeReader};
23use futures::{AsyncRead, AsyncSeek, AsyncWrite};
24use object_store::ObjectStore;
25use pin_project::pin_project;
26use prometheus::IntCounter;
27use snafu::ResultExt;
28
29use crate::error::{OpenDalSnafu, Result};
30
31#[derive(Clone)]
36pub(crate) struct InstrumentedStore {
37 object_store: ObjectStore,
39 write_buffer_size: Option<usize>,
41}
42
43impl InstrumentedStore {
44 pub fn new(object_store: ObjectStore) -> Self {
46 Self {
47 object_store,
48 write_buffer_size: None,
49 }
50 }
51
52 pub fn with_write_buffer_size(mut self, write_buffer_size: Option<usize>) -> Self {
54 self.write_buffer_size = write_buffer_size.filter(|&size| size > 0);
55 self
56 }
57
58 pub async fn range_reader<'a>(
61 &self,
62 path: &str,
63 read_byte_count: &'a IntCounter,
64 read_count: &'a IntCounter,
65 ) -> Result<InstrumentedRangeReader<'a>> {
66 Ok(InstrumentedRangeReader {
67 store: self.object_store.clone(),
68 path: path.to_string(),
69 read_byte_count,
70 read_count,
71 file_size_hint: None,
72 })
73 }
74
75 pub async fn reader<'a>(
79 &self,
80 path: &str,
81 read_byte_count: &'a IntCounter,
82 read_count: &'a IntCounter,
83 seek_count: &'a IntCounter,
84 ) -> Result<InstrumentedAsyncRead<'a, object_store::FuturesAsyncReader>> {
85 let meta = self.object_store.stat(path).await.context(OpenDalSnafu)?;
86 let reader = self
87 .object_store
88 .reader(path)
89 .await
90 .context(OpenDalSnafu)?
91 .into_futures_async_read(0..meta.content_length())
92 .await
93 .context(OpenDalSnafu)?;
94 Ok(InstrumentedAsyncRead::new(
95 reader,
96 read_byte_count,
97 read_count,
98 seek_count,
99 ))
100 }
101
102 pub async fn writer<'a>(
106 &self,
107 path: &str,
108 write_byte_count: &'a IntCounter,
109 write_count: &'a IntCounter,
110 flush_count: &'a IntCounter,
111 ) -> Result<InstrumentedAsyncWrite<'a, object_store::FuturesAsyncWriter>> {
112 let writer = match self.write_buffer_size {
113 Some(size) => self
114 .object_store
115 .writer_with(path)
116 .chunk(size)
117 .await
118 .context(OpenDalSnafu)?
119 .into_futures_async_write(),
120 None => self
121 .object_store
122 .writer(path)
123 .await
124 .context(OpenDalSnafu)?
125 .into_futures_async_write(),
126 };
127 Ok(InstrumentedAsyncWrite::new(
128 writer,
129 write_byte_count,
130 write_count,
131 flush_count,
132 ))
133 }
134
135 pub async fn list(&self, path: &str) -> Result<Vec<object_store::Entry>> {
137 let list = self.object_store.list(path).await.context(OpenDalSnafu)?;
138 Ok(list)
139 }
140
141 pub async fn remove_all(&self, path: &str) -> Result<()> {
143 self.object_store
144 .remove_all(path)
145 .await
146 .context(OpenDalSnafu)
147 }
148}
149
150#[pin_project]
152pub(crate) struct InstrumentedAsyncRead<'a, R> {
153 #[pin]
154 inner: R,
155 read_byte_count: CounterGuard<'a>,
156 read_count: CounterGuard<'a>,
157 seek_count: CounterGuard<'a>,
158}
159
160impl<'a, R> InstrumentedAsyncRead<'a, R> {
161 fn new(
163 inner: R,
164 read_byte_count: &'a IntCounter,
165 read_count: &'a IntCounter,
166 seek_count: &'a IntCounter,
167 ) -> Self {
168 Self {
169 inner,
170 read_byte_count: CounterGuard::new(read_byte_count),
171 read_count: CounterGuard::new(read_count),
172 seek_count: CounterGuard::new(seek_count),
173 }
174 }
175}
176
177impl<R: AsyncRead + Unpin + Send> AsyncRead for InstrumentedAsyncRead<'_, R> {
178 fn poll_read(
179 mut self: Pin<&mut Self>,
180 cx: &mut Context<'_>,
181 buf: &mut [u8],
182 ) -> Poll<io::Result<usize>> {
183 let poll = self.as_mut().project().inner.poll_read(cx, buf);
184 if let Poll::Ready(Ok(n)) = &poll {
185 self.read_count.inc_by(1);
186 self.read_byte_count.inc_by(*n);
187 }
188 poll
189 }
190}
191
192impl<R: AsyncSeek + Unpin + Send> AsyncSeek for InstrumentedAsyncRead<'_, R> {
193 fn poll_seek(
194 mut self: Pin<&mut Self>,
195 cx: &mut Context<'_>,
196 pos: io::SeekFrom,
197 ) -> Poll<io::Result<u64>> {
198 let poll = self.as_mut().project().inner.poll_seek(cx, pos);
199 if let Poll::Ready(Ok(_)) = &poll {
200 self.seek_count.inc_by(1);
201 }
202 poll
203 }
204}
205
206#[pin_project]
208pub(crate) struct InstrumentedAsyncWrite<'a, W> {
209 #[pin]
210 inner: W,
211 write_byte_count: CounterGuard<'a>,
212 write_count: CounterGuard<'a>,
213 flush_count: CounterGuard<'a>,
214}
215
216impl<'a, W> InstrumentedAsyncWrite<'a, W> {
217 fn new(
219 inner: W,
220 write_byte_count: &'a IntCounter,
221 write_count: &'a IntCounter,
222 flush_count: &'a IntCounter,
223 ) -> Self {
224 Self {
225 inner,
226 write_byte_count: CounterGuard::new(write_byte_count),
227 write_count: CounterGuard::new(write_count),
228 flush_count: CounterGuard::new(flush_count),
229 }
230 }
231}
232
233impl<W: AsyncWrite + Unpin + Send> AsyncWrite for InstrumentedAsyncWrite<'_, W> {
234 fn poll_write(
235 mut self: Pin<&mut Self>,
236 cx: &mut Context<'_>,
237 buf: &[u8],
238 ) -> Poll<io::Result<usize>> {
239 let poll = self.as_mut().project().inner.poll_write(cx, buf);
240 if let Poll::Ready(Ok(n)) = &poll {
241 self.write_count.inc_by(1);
242 self.write_byte_count.inc_by(*n);
243 }
244 poll
245 }
246
247 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
248 let poll = self.as_mut().project().inner.poll_flush(cx);
249 if let Poll::Ready(Ok(())) = &poll {
250 self.flush_count.inc_by(1);
251 }
252 poll
253 }
254
255 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
256 self.project().inner.poll_close(cx)
257 }
258}
259
260pub(crate) struct InstrumentedRangeReader<'a> {
262 store: ObjectStore,
263 path: String,
264 read_byte_count: &'a IntCounter,
265 read_count: &'a IntCounter,
266 file_size_hint: Option<u64>,
267}
268
269impl SizeAwareRangeReader for InstrumentedRangeReader<'_> {
270 fn with_file_size_hint(&mut self, file_size_hint: u64) {
271 self.file_size_hint = Some(file_size_hint);
272 }
273}
274
275#[async_trait]
276impl RangeReader for InstrumentedRangeReader<'_> {
277 async fn metadata(&self) -> io::Result<Metadata> {
278 match self.file_size_hint {
279 Some(file_size_hint) => Ok(Metadata {
280 content_length: file_size_hint,
281 }),
282 None => {
283 let stat = self.store.stat(&self.path).await?;
284 Ok(Metadata {
285 content_length: stat.content_length(),
286 })
287 }
288 }
289 }
290
291 async fn read(&self, range: Range<u64>) -> io::Result<Bytes> {
292 let buf = self.store.reader(&self.path).await?.read(range).await?;
293 self.read_byte_count.inc_by(buf.len() as _);
294 self.read_count.inc_by(1);
295 Ok(buf.to_bytes())
296 }
297
298 async fn read_into(&self, range: Range<u64>, buf: &mut (impl BufMut + Send)) -> io::Result<()> {
299 let reader = self.store.reader(&self.path).await?;
300 let size = reader.read_into(buf, range).await?;
301 self.read_byte_count.inc_by(size as _);
302 self.read_count.inc_by(1);
303 Ok(())
304 }
305
306 async fn read_vec(&self, ranges: &[Range<u64>]) -> io::Result<Vec<Bytes>> {
307 let bufs = self
308 .store
309 .reader(&self.path)
310 .await?
311 .fetch(ranges.to_owned())
312 .await?;
313 let total_size: usize = bufs.iter().map(|buf| buf.len()).sum();
314 self.read_byte_count.inc_by(total_size as _);
315 self.read_count.inc_by(1);
316 Ok(bufs.into_iter().map(|buf| buf.to_bytes()).collect())
317 }
318}
319
320struct CounterGuard<'a> {
322 count: usize,
323 counter: &'a IntCounter,
324}
325
326impl<'a> CounterGuard<'a> {
327 fn new(counter: &'a IntCounter) -> Self {
329 Self { count: 0, counter }
330 }
331
332 fn inc_by(&mut self, n: usize) {
334 self.count += n;
335 }
336}
337
338impl Drop for CounterGuard<'_> {
339 fn drop(&mut self) {
340 if self.count > 0 {
341 self.counter.inc_by(self.count as _);
342 }
343 }
344}
345
346#[cfg(test)]
347mod tests {
348 use futures::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
349 use object_store::services::Memory;
350
351 use super::*;
352
353 #[tokio::test]
354 async fn test_instrumented_store_read_write() {
355 let instrumented_store =
356 InstrumentedStore::new(ObjectStore::new(Memory::default()).unwrap().finish());
357
358 let read_byte_count = IntCounter::new("read_byte_count", "read_byte_count").unwrap();
359 let read_count = IntCounter::new("read_count", "read_count").unwrap();
360 let seek_count = IntCounter::new("seek_count", "seek_count").unwrap();
361 let write_byte_count = IntCounter::new("write_byte_count", "write_byte_count").unwrap();
362 let write_count = IntCounter::new("write_count", "write_count").unwrap();
363 let flush_count = IntCounter::new("flush_count", "flush_count").unwrap();
364
365 let mut writer = instrumented_store
366 .writer("my_file", &write_byte_count, &write_count, &flush_count)
367 .await
368 .unwrap();
369 writer.write_all(b"hello").await.unwrap();
370 writer.flush().await.unwrap();
371 writer.close().await.unwrap();
372 drop(writer);
373
374 let mut reader = instrumented_store
375 .reader("my_file", &read_byte_count, &read_count, &seek_count)
376 .await
377 .unwrap();
378 let mut buf = vec![0; 5];
379 reader.read_exact(&mut buf).await.unwrap();
380 reader.seek(io::SeekFrom::Start(0)).await.unwrap();
381 reader.read_exact(&mut buf).await.unwrap();
382 drop(reader);
383
384 assert_eq!(read_byte_count.get(), 10);
385 assert_eq!(read_count.get(), 2);
386 assert_eq!(seek_count.get(), 1);
387 assert_eq!(write_byte_count.get(), 5);
388 assert_eq!(write_count.get(), 1);
389 assert_eq!(flush_count.get(), 1);
390 }
391}